Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141344321 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) + + /* Check id streaming data schema matches with carbon table schema + * Data from socket source does not have schema attached to it, + * Following check is to ignore schema validation for socket source. + */ + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val isSchemaValid = validateSchema(carbonTableSchema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema( sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + tablePath: String): org.apache.carbondata.format.TableSchema = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val thriftTableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession) + + val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table + factTable + } /** + * Validates streamed schema against existing table schema + * @param carbonTableSchema existing carbon table schema + * @param dataSchema streamed data schema + * @return true if schema validation is successful else false + */ + private def validateSchema( + carbonTableSchema: org.apache.carbondata.format.TableSchema, + dataSchema: StructType): Boolean = { + + import scala.collection.mutable.ListBuffer + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { + columnDataTypes.append(columnDataType.data_type.toString) + } + val tableColumnDataTypeList = columnDataTypes.toList + + var streamSchemaDataTypes = new ListBuffer[String]() + for(i <- 0 until dataSchema.size) { + streamSchemaDataTypes + .append( + mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString)) + } + val streamedDataTypeList = streamSchemaDataTypes.toList + + val isValid = tableColumnDataTypeList == streamedDataTypeList + isValid + } + + /** + * Parses streamed datatype according to carbon datatype + * @param dataType + * @return String + */ + def mapStreamingDataTypeToString(dataType: String): String = { + import org.apache.carbondata.format.DataType + dataType match { + case "IntegerType" => DataType.INT.toString + case "StringType" => DataType.STRING.toString + case "DateType" => DataType.DATE.toString + case "DoubleType" => DataType.DOUBLE.toString + case "FloatType" => DataType.DOUBLE.toString + case "LongType" => DataType.LONG.toString + case "ShortType" => DataType.SHORT.toString + case "TimestampType" => DataType.TIMESTAMP.toString + } + } + + /** + * Validates if given table exists or throws exception + * @param String existing carbon table path + * @return None + */ + private def validateTable(tablePath: String): Unit = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + val absoluteTableIdentifier: AbsoluteTableIdentifier = + new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, + UUID.randomUUID().toString)) + + if (!checkIfTableExists(absoluteTableIdentifier)) { + throw new NoSuchTableException(dbName, tableName) + } + } + + /** + * Checks if table exists by checking its schema file + * @param absoluteTableIdentifier + * @return Boolean + */ + private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = { + val carbonTablePath: CarbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier) + val schemaFilePath: String = carbonTablePath.getSchemaFilePath + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS) + } + + /** + * If user wants to stream data from carbondata table source + * and if following conditions are true: + * 1. No schema provided by the user in readStream() + * 2. spark.sql.streaming.schemaInference is set to true + * carbondata can infer a table schema from a valid table path + * The schema inference is not mandatory, but good have. * When possible, this method should return the schema of the given `files`. When the format * does not support inference, or no valid files are given should return None. In these cases * Spark will require that user specify the schema manually. */ - def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType)) + override def inferSchema( + sparkSession: SparkSession, --- End diff -- correct the indentation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141344765 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) + + /* Check id streaming data schema matches with carbon table schema + * Data from socket source does not have schema attached to it, + * Following check is to ignore schema validation for socket source. + */ + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val isSchemaValid = validateSchema(carbonTableSchema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema( sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + tablePath: String): org.apache.carbondata.format.TableSchema = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val thriftTableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession) + + val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table + factTable + } /** + * Validates streamed schema against existing table schema + * @param carbonTableSchema existing carbon table schema + * @param dataSchema streamed data schema + * @return true if schema validation is successful else false + */ + private def validateSchema( + carbonTableSchema: org.apache.carbondata.format.TableSchema, + dataSchema: StructType): Boolean = { + + import scala.collection.mutable.ListBuffer + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { + columnDataTypes.append(columnDataType.data_type.toString) + } + val tableColumnDataTypeList = columnDataTypes.toList + + var streamSchemaDataTypes = new ListBuffer[String]() + for(i <- 0 until dataSchema.size) { + streamSchemaDataTypes + .append( + mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString)) + } + val streamedDataTypeList = streamSchemaDataTypes.toList + + val isValid = tableColumnDataTypeList == streamedDataTypeList + isValid + } + + /** + * Parses streamed datatype according to carbon datatype + * @param dataType + * @return String + */ + def mapStreamingDataTypeToString(dataType: String): String = { + import org.apache.carbondata.format.DataType + dataType match { + case "IntegerType" => DataType.INT.toString + case "StringType" => DataType.STRING.toString + case "DateType" => DataType.DATE.toString + case "DoubleType" => DataType.DOUBLE.toString + case "FloatType" => DataType.DOUBLE.toString + case "LongType" => DataType.LONG.toString + case "ShortType" => DataType.SHORT.toString + case "TimestampType" => DataType.TIMESTAMP.toString + } + } + + /** + * Validates if given table exists or throws exception + * @param String existing carbon table path + * @return None + */ + private def validateTable(tablePath: String): Unit = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + val absoluteTableIdentifier: AbsoluteTableIdentifier = + new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, + UUID.randomUUID().toString)) + + if (!checkIfTableExists(absoluteTableIdentifier)) { + throw new NoSuchTableException(dbName, tableName) + } + } + + /** + * Checks if table exists by checking its schema file + * @param absoluteTableIdentifier + * @return Boolean + */ + private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = { + val carbonTablePath: CarbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier) + val schemaFilePath: String = carbonTablePath.getSchemaFilePath + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS) + } + + /** + * If user wants to stream data from carbondata table source + * and if following conditions are true: + * 1. No schema provided by the user in readStream() + * 2. spark.sql.streaming.schemaInference is set to true + * carbondata can infer a table schema from a valid table path + * The schema inference is not mandatory, but good have. * When possible, this method should return the schema of the given `files`. When the format * does not support inference, or no valid files are given should return None. In these cases * Spark will require that user specify the schema manually. */ - def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType)) + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + Some(new StructType().add("value", StringType)) + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + // Check if table with given path exists + validateTable(tablePath) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + import scala.collection.mutable.ListBuffer + import scala.collection.JavaConverters._ --- End diff -- move import to head of file --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141345191 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) + + /* Check id streaming data schema matches with carbon table schema + * Data from socket source does not have schema attached to it, + * Following check is to ignore schema validation for socket source. + */ + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val isSchemaValid = validateSchema(carbonTableSchema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema( sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + tablePath: String): org.apache.carbondata.format.TableSchema = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val thriftTableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession) + + val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table + factTable + } /** + * Validates streamed schema against existing table schema + * @param carbonTableSchema existing carbon table schema + * @param dataSchema streamed data schema + * @return true if schema validation is successful else false + */ + private def validateSchema( + carbonTableSchema: org.apache.carbondata.format.TableSchema, + dataSchema: StructType): Boolean = { + + import scala.collection.mutable.ListBuffer + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { + columnDataTypes.append(columnDataType.data_type.toString) + } + val tableColumnDataTypeList = columnDataTypes.toList + + var streamSchemaDataTypes = new ListBuffer[String]() + for(i <- 0 until dataSchema.size) { + streamSchemaDataTypes + .append( + mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString)) + } + val streamedDataTypeList = streamSchemaDataTypes.toList + + val isValid = tableColumnDataTypeList == streamedDataTypeList + isValid + } + + /** + * Parses streamed datatype according to carbon datatype + * @param dataType + * @return String + */ + def mapStreamingDataTypeToString(dataType: String): String = { + import org.apache.carbondata.format.DataType + dataType match { + case "IntegerType" => DataType.INT.toString + case "StringType" => DataType.STRING.toString + case "DateType" => DataType.DATE.toString + case "DoubleType" => DataType.DOUBLE.toString + case "FloatType" => DataType.DOUBLE.toString + case "LongType" => DataType.LONG.toString + case "ShortType" => DataType.SHORT.toString + case "TimestampType" => DataType.TIMESTAMP.toString + } + } + + /** + * Validates if given table exists or throws exception + * @param String existing carbon table path + * @return None + */ + private def validateTable(tablePath: String): Unit = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + val absoluteTableIdentifier: AbsoluteTableIdentifier = + new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, + UUID.randomUUID().toString)) + + if (!checkIfTableExists(absoluteTableIdentifier)) { + throw new NoSuchTableException(dbName, tableName) + } + } + + /** + * Checks if table exists by checking its schema file + * @param absoluteTableIdentifier + * @return Boolean + */ + private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = { + val carbonTablePath: CarbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier) + val schemaFilePath: String = carbonTablePath.getSchemaFilePath + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS) + } + + /** + * If user wants to stream data from carbondata table source + * and if following conditions are true: + * 1. No schema provided by the user in readStream() + * 2. spark.sql.streaming.schemaInference is set to true + * carbondata can infer a table schema from a valid table path + * The schema inference is not mandatory, but good have. * When possible, this method should return the schema of the given `files`. When the format * does not support inference, or no valid files are given should return None. In these cases * Spark will require that user specify the schema manually. */ - def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType)) + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + Some(new StructType().add("value", StringType)) + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + // Check if table with given path exists + validateTable(tablePath) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + import scala.collection.mutable.ListBuffer + import scala.collection.JavaConverters._ + val tableColumnNames = new ListBuffer[String]() + for (columnName <- columnnSchemaValues) { + tableColumnNames.append(columnName.column_name) + } + val tableColumnNamesList = tableColumnNames.toList + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { + columnDataTypes.append(columnDataType.data_type.toString) + } + val tableColumnDataTypeList = columnDataTypes.toList + + val inferredSchema: Option[StructType] = new Some(new StructType()) + for (i <- tableColumnNamesList.indices) { + inferredSchema.get.add(tableColumnNamesList(i), tableColumnDataTypeList(i)) --- End diff -- It seems you can use `columnName.column_name` directly instead of `tableColumnNamesList(i)` --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:
https://github.com/apache/carbondata/pull/1352 All review comments have been addressed in previous commit. Please review and merge. mvn -T 4C -Pspark-2.1 -Dspark.version=2.1.0 clean verify - Successful mvn clean verify - Successful --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:
https://github.com/apache/carbondata/pull/1352 All review comments have been addressed in the latest commit. Please review and merge. mvn -T 4C -Pspark-2.1 -Dspark.version=2.1.0 clean verify - Successful mvn clean verify - Successful --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1352 LGTM. Thanks for working on this --- |
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |