Github user aniketadnaik commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138813373 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.commons.lang.RandomStringUtils +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.utils.StreamingCleanupUtil + +object CarbonStreamingIngestFileSourceExample { + + def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir" + // val csvDataFile = s"$csvDataDir/sampleData.csv" + // val csvDataFile = s"$csvDataDir/sample.csv" + val streamTableName = s"_carbon_file_stream_table_" + val stremTablePath = s"$storeLocation/default/$streamTableName" + val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + // cleanup any residual files + StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation)) + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local") + .appName("CarbonFileStreamingExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("ERROR") + + // Writes Dataframe to CarbonData file: + import spark.implicits._ + import org.apache.spark.sql.types._ + + // Generate random data + val dataDF = spark.sparkContext.parallelize(1 to 10) + .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)). + toDF("id", "name", "city", "salary") + + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}") + + // Create Carbon Table + // Saves dataframe to carbondata file + dataDF.write --- End diff -- yes, will add more comments. --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138813473 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingCleanupUtil.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples.utils + +import java.io.IOException + +import scala.tools.nsc.io.Path + +// scalastyle:off println +object StreamingCleanupUtil { --- End diff -- I agree, csv data generation can be part of utility object. I'll change it. --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138813548 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.commons.lang.RandomStringUtils +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.utils.StreamingCleanupUtil + +object CarbonStreamingIngestFileSourceExample { + + def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir" + // val csvDataFile = s"$csvDataDir/sampleData.csv" + // val csvDataFile = s"$csvDataDir/sample.csv" + val streamTableName = s"_carbon_file_stream_table_" + val stremTablePath = s"$storeLocation/default/$streamTableName" + val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + // cleanup any residual files + StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation)) + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local") + .appName("CarbonFileStreamingExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("ERROR") + + // Writes Dataframe to CarbonData file: + import spark.implicits._ + import org.apache.spark.sql.types._ + + // Generate random data + val dataDF = spark.sparkContext.parallelize(1 to 10) + .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)). + toDF("id", "name", "city", "salary") + + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}") + + // Create Carbon Table + // Saves dataframe to carbondata file + dataDF.write + .format("carbondata") + .option("tableName", streamTableName) + .option("compress", "true") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + + spark.sql(s""" SELECT * FROM ${streamTableName} """).show() + + // Create csv data frame file + val csvDataDF = spark.sparkContext.parallelize(11 to 30) + .map(id => (id, + s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}", + s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}", + 10000.00*id)).toDF("id", "name", "city", "salary") + + // write data into csv file ( It will be used as a stream source) + csvDataDF.write. + format("com.databricks.spark.csv"). --- End diff -- sure, df.write.csv() will make it more readable since its native to Spark2.x. --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138813656 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.commons.lang.RandomStringUtils +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.utils.StreamingCleanupUtil + +object CarbonStreamingIngestFileSourceExample { --- End diff -- sure, I'll add the description. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138836834 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.commons.lang.RandomStringUtils +import org.apache.spark.sql.{SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.examples.utils.StreamingCleanupUtil + +object CarbonStreamingIngestFileSourceExample { + + def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir" + // val csvDataFile = s"$csvDataDir/sampleData.csv" + // val csvDataFile = s"$csvDataDir/sample.csv" + val streamTableName = s"_carbon_file_stream_table_" + val stremTablePath = s"$storeLocation/default/$streamTableName" + val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + // cleanup any residual files + StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation)) + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local") + .appName("CarbonFileStreamingExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("ERROR") + + // Writes Dataframe to CarbonData file: + import spark.implicits._ + import org.apache.spark.sql.types._ + + // Generate random data + val dataDF = spark.sparkContext.parallelize(1 to 10) + .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)). + toDF("id", "name", "city", "salary") + + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}") + + // Create Carbon Table + // Saves dataframe to carbondata file + dataDF.write + .format("carbondata") + .option("tableName", streamTableName) + .option("compress", "true") + .option("tempCSV", "false") + .mode(SaveMode.Overwrite) + .save() + + spark.sql(s""" SELECT * FROM ${streamTableName} """).show() + + // Create csv data frame file + val csvDataDF = spark.sparkContext.parallelize(11 to 30) + .map(id => (id, + s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}", + s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}", + 10000.00*id)).toDF("id", "name", "city", "salary") + + // write data into csv file ( It will be used as a stream source) + csvDataDF.write. + format("com.databricks.spark.csv"). + option("header", "true"). + save(csvDataDir) + + // define custom schema + val inputSchema = new StructType(). + add("id", "integer"). + add("name", "string"). + add("city", "string"). + add("salary", "float") + + // Read csv data file as a streaming source + val csvReadDF = spark.readStream. --- End diff -- agreed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138838653 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ def prepareWrite( --- End diff -- missing `override` keyword? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138838848 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { -/** - * When possible, this method should return the schema of the given `files`. When the format - * does not support inference, or no valid files are given should return None. In these cases - * Spark will require that user specify the schema manually. - */ + // Check if table with given path exists + validateTable(options.get("path").get) + + // Check id streaming data schema matches with carbon table schema + // Data from socket source does not have schema attached to it, + // Following check is to ignore schema validation for socket source. + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val tablePath = options.get("path") + val path: String = tablePath match { + case Some(value) => value + case None => "" + } + val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, path) + val schemaPath = path + "/Metadata/schema" + val schema: TableInfo = meta.readSchemaFile(schemaPath) + val isSchemaValid = validateSchema(schema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema(sparkSession: SparkSession, tablePath: String): TableInfo = { + val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, tablePath) + val schemaPath = tablePath + "/Metadata/schema" + val schema: TableInfo = meta.readSchemaFile(schemaPath) + schema + } + + /** + * Validates streamed schema against existing table schema + * @param schema existing carbon table schema + * @param dataSchema streamed data schema + * @return true if schema validation is successful else false + */ + private def validateSchema(schema: TableInfo, dataSchema: StructType): Boolean = { + val factTable: TableSchema = schema.getFact_table + + import scala.collection.mutable.ListBuffer + import scala.collection.JavaConverters._ + var columnnSchemaValues = factTable.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { + columnDataTypes.append(columnDataType.data_type.toString) + } + val tableColumnDataTypeList = columnDataTypes.toList + + var streamSchemaDataTypes = new ListBuffer[String]() + for(i <- 0 until dataSchema.size) { + streamSchemaDataTypes + .append( + mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString)) + } + val streamedDataTypeList = streamSchemaDataTypes.toList + + val isValid = tableColumnDataTypeList == streamedDataTypeList + isValid + } + + /** + * Parses streamed datatype according to carbon datatype + * @param dataType + * @return String + */ + def mapStreamingDataTypeToString(dataType: String): String = { + dataType match { + case "IntegerType" => DataType.INT.toString + case "StringType" => DataType.STRING.toString + case "DateType" => DataType.DATE.toString + case "DoubleType" => DataType.DOUBLE.toString + case "FloatType" => DataType.DOUBLE.toString + case "LongType" => DataType.LONG.toString + case "ShortType" => DataType.SHORT.toString + case "TimestampType" => DataType.TIMESTAMP.toString + } + } + + /** + * Validates if given table exists or throws exception + * @param String existing carbon table path + * @return None + */ + private def validateTable(tablePath: String): Unit = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + val absoluteTableIdentifier: AbsoluteTableIdentifier = + new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, + UUID.randomUUID().toString)) + + if (!checkIfTableExists(absoluteTableIdentifier)) { + throw new NoSuchTableException(dbName, tableName) + } + } + + /** + * Checks if table exists by checking its schema file + * @param absoluteTableIdentifier + * @return Boolean + */ + private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = { + val carbonTablePath: CarbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier) + val schemaFilePath: String = carbonTablePath.getSchemaFilePath + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS) + } + + /** + * If use wants to stream data from carbondata table source + * and if following conditions are true: + * 1. No schema provided by the user in readStream() + * 2. spark.sql.streaming.schemaInference is set to true + * carbondata can infer a table schema from a valid table path + * The schema inference is not mandatory, but good have. + * When possible, this method should return the schema of the given `files`. When the format + * does not support inference, or no valid files are given should return None. In these cases + * Spark will require that user specify the schema manually. + */ def inferSchema( --- End diff -- missing override keyword? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138839229 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ def prepareWrite( --- End diff -- And there is incorrect indentation at line 217 --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r138840023 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ def prepareWrite( --- End diff -- I have rebased streaming_ingest branch to latest master, please rebase this PR --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r139074844 --- Diff: core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.streaming; + +/** + * Commit info for streaming writes + * The commit info can be used to recover valid offset in the file + * in the case of write failure. + */ +public class CarbonStreamingCommitInfo { + + private String dataBase; + + private String table; --- End diff -- we should use tableID instead of table name --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r139085031 --- Diff: core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.streaming; + +/** + * Commit info for streaming writes + * The commit info can be used to recover valid offset in the file + * in the case of write failure. + */ +public class CarbonStreamingCommitInfo { + + private String dataBase; + + private String table; --- End diff -- This file is from previous PR. I guess my recent rebase has moved the HEAD for streaming_ingest branch in my private repo. This needs to be corrected. Please hold off your review . On a side note - this class is just to provide a framework for storing commit info, actual values can be changed at the time of implementation. --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:
https://github.com/apache/carbondata/pull/1352 Please hold off review. Rebase has caused some issue. I'll need to fix it. --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:
https://github.com/apache/carbondata/pull/1352 Please rebase "streaming_ingest" branch from latest "master" to take care of presto test failures. --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:
https://github.com/apache/carbondata/pull/1352 * Please merge this PR (Branch: **StreamIngest-1174**) to "**streaming_ingest**" branch. * Following is the build and test report $>mvn -Pspark-2.1 -Dspark.version=2.1.0 clean verify . [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Apache CarbonData :: Parent ........................ SUCCESS [ 1.829 s] [INFO] Apache CarbonData :: Common ........................ SUCCESS [ 4.067 s] [INFO] Apache CarbonData :: Core .......................... SUCCESS [ 46.276 s] [INFO] Apache CarbonData :: Processing .................... SUCCESS [ 13.728 s] [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [ 12.789 s] [INFO] Apache CarbonData :: Spark Common .................. SUCCESS [ 29.121 s] [INFO] Apache CarbonData :: Spark2 ........................ SUCCESS [03:02 min] [INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [08:54 min] [INFO] Apache CarbonData :: Assembly ...................... SUCCESS [ 1.730 s] [INFO] Apache CarbonData :: Hive .......................... SUCCESS [ 13.845 s] [INFO] Apache CarbonData :: presto ........................ SUCCESS [ 34.772 s] [INFO] Apache CarbonData :: Spark2 Examples ............... SUCCESS [ 9.362 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 14:44 min [INFO] Finished at: 2017-09-20T18:47:44-07:00 [INFO] Final Memory: 174M/1827M [INFO] ------------------------------------------------------------------------ $>mvn clean verify [INFO] Reactor Summary: [INFO] [INFO] Apache CarbonData :: Parent ........................ SUCCESS [ 1.839 s] [INFO] Apache CarbonData :: Common ........................ SUCCESS [ 3.985 s] [INFO] Apache CarbonData :: Core .......................... SUCCESS [ 46.732 s] [INFO] Apache CarbonData :: Processing .................... SUCCESS [ 13.321 s] [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [ 13.376 s] [INFO] Apache CarbonData :: Spark Common .................. SUCCESS [ 29.065 s] [INFO] Apache CarbonData :: Spark2 ........................ SUCCESS [03:02 min] [INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [08:45 min] [INFO] Apache CarbonData :: Assembly ...................... SUCCESS [ 1.772 s] [INFO] Apache CarbonData :: Hive .......................... SUCCESS [ 12.770 s] [INFO] Apache CarbonData :: presto ........................ SUCCESS [ 38.861 s] [INFO] Apache CarbonData :: Spark2 Examples ............... SUCCESS [ 9.452 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 14:39 min [INFO] Finished at: 2017-09-20T18:30:11-07:00 [INFO] Final Memory: 173M/1628M [INFO] ------------------------------------------------------------------------ --- |
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:
https://github.com/apache/carbondata/pull/1352 Please review and merge this PR (Branch: StreamIngest-1174) to "streaming_ingest" branch. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141342069 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -179,14 +192,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider } } - /** - * Returns the path of the table - * - * @param sparkSession - * @param dbName - * @param tableName - * @return - */ +/** + * Returns the path of the table + * @param sparkSession + * @param dbName + * @param tableName + * @return + */ --- End diff -- please correct the indentation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141343067 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) --- End diff -- change `options.get("path").get` to `options("path")` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141343541 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) + + /* Check id streaming data schema matches with carbon table schema + * Data from socket source does not have schema attached to it, + * Following check is to ignore schema validation for socket source. + */ + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val isSchemaValid = validateSchema(carbonTableSchema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema( sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + tablePath: String): org.apache.carbondata.format.TableSchema = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) --- End diff -- remove unnecessary bracket and `toString` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141344066 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) + + /* Check id streaming data schema matches with carbon table schema + * Data from socket source does not have schema attached to it, + * Following check is to ignore schema validation for socket source. + */ + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val isSchemaValid = validateSchema(carbonTableSchema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema( sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + tablePath: String): org.apache.carbondata.format.TableSchema = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val thriftTableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession) + + val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table + factTable + } /** + * Validates streamed schema against existing table schema + * @param carbonTableSchema existing carbon table schema + * @param dataSchema streamed data schema + * @return true if schema validation is successful else false + */ + private def validateSchema( + carbonTableSchema: org.apache.carbondata.format.TableSchema, + dataSchema: StructType): Boolean = { + + import scala.collection.mutable.ListBuffer + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { --- End diff -- use `columnSchemaValues.foreach` instead of for --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1352#discussion_r141344187 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider * be put here. For example, user defined output committer can be configured here * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ - def prepareWrite( + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + // Check if table with given path exists + validateTable(options.get("path").get) + + /* Check id streaming data schema matches with carbon table schema + * Data from socket source does not have schema attached to it, + * Following check is to ignore schema validation for socket source. + */ + if (!(dataSchema.size.equals(1) && + dataSchema.fields(0).dataType.equals(StringType))) { + val path = options.get("path") + val tablePath: String = path match { + case Some(value) => value + case None => "" + } + + val carbonTableSchema: org.apache.carbondata.format.TableSchema = + getTableSchema(sparkSession: SparkSession, tablePath: String) + val isSchemaValid = validateSchema(carbonTableSchema, dataSchema) + + if(!isSchemaValid) { + LOGGER.error("Schema Validation Failed: streaming data schema" + + "does not match with carbon table schema") + throw new InvalidSchemaException("Schema Validation Failed : " + + "streaming data schema does not match with carbon table schema") + } + } + new CarbonStreamingOutputWriterFactory() + } + + /** + * Read schema from existing carbon table + * @param sparkSession + * @param tablePath carbon table path + * @return true if schema validation is successful else false + */ + private def getTableSchema( sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + tablePath: String): org.apache.carbondata.format.TableSchema = { + + val formattedTablePath = tablePath.replace('\\', '/') + val names = formattedTablePath.split("/") + if (names.length < 3) { + throw new IllegalArgumentException("invalid table path: " + tablePath) + } + val tableName : String = names(names.length - 1) + val dbName : String = names(names.length - 2) + val storePath = formattedTablePath.substring(0, + formattedTablePath.lastIndexOf + (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString) + .concat(tableName)).toString) - 1) + + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val thriftTableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession) + + val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table + factTable + } /** + * Validates streamed schema against existing table schema + * @param carbonTableSchema existing carbon table schema + * @param dataSchema streamed data schema + * @return true if schema validation is successful else false + */ + private def validateSchema( + carbonTableSchema: org.apache.carbondata.format.TableSchema, + dataSchema: StructType): Boolean = { + + import scala.collection.mutable.ListBuffer + val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal) + + var columnDataTypes = new ListBuffer[String]() + for(columnDataType <- columnnSchemaValues) { + columnDataTypes.append(columnDataType.data_type.toString) + } + val tableColumnDataTypeList = columnDataTypes.toList + + var streamSchemaDataTypes = new ListBuffer[String]() + for(i <- 0 until dataSchema.size) { --- End diff -- use columnSchemaValues.foreach instead of for --- |
Free forum by Nabble | Edit this page |