[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

classic Classic list List threaded Threaded
47 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813373
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    --- End diff --
   
    yes, will add more comments.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813473
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingCleanupUtil.scala ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples.utils
    +
    +import java.io.IOException
    +
    +import scala.tools.nsc.io.Path
    +
    +// scalastyle:off println
    +object StreamingCleanupUtil {
    --- End diff --
   
    I agree, csv data generation can be part of utility object. I'll change it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813548
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    +      .format("carbondata")
    +      .option("tableName", streamTableName)
    +      .option("compress", "true")
    +      .option("tempCSV", "false")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
    +
    +    // Create csv data frame file
    +    val csvDataDF = spark.sparkContext.parallelize(11 to 30)
    +      .map(id => (id,
    +        s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}",
    +        s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}",
    +        10000.00*id)).toDF("id", "name", "city", "salary")
    +
    +    // write data into csv file ( It will be used as a stream source)
    +    csvDataDF.write.
    +      format("com.databricks.spark.csv").
    --- End diff --
   
    sure, df.write.csv() will make it more readable since its native to Spark2.x.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813656
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    --- End diff --
   
    sure, I'll add the description.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138836834
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    +      .format("carbondata")
    +      .option("tableName", streamTableName)
    +      .option("compress", "true")
    +      .option("tempCSV", "false")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
    +
    +    // Create csv data frame file
    +    val csvDataDF = spark.sparkContext.parallelize(11 to 30)
    +      .map(id => (id,
    +        s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}",
    +        s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}",
    +        10000.00*id)).toDF("id", "name", "city", "salary")
    +
    +    // write data into csv file ( It will be used as a stream source)
    +    csvDataDF.write.
    +      format("com.databricks.spark.csv").
    +      option("header", "true").
    +      save(csvDataDir)
    +
    +    // define custom schema
    +    val inputSchema = new StructType().
    +      add("id", "integer").
    +      add("name", "string").
    +      add("city", "string").
    +      add("salary", "float")
    +
    +    // Read csv data file as a streaming source
    +    val csvReadDF = spark.readStream.
    --- End diff --
   
    agreed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138838653
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    --- End diff --
   
    missing `override` keyword?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138838848
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the format
    - * does not support inference, or no valid files are given should return None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val tablePath = options.get("path")
    +      val path: String = tablePath match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +      val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, path)
    +      val schemaPath = path + "/Metadata/schema"
    +      val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +      val isSchemaValid = validateSchema(schema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(sparkSession: SparkSession, tablePath: String): TableInfo = {
    +    val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, tablePath)
    +    val schemaPath = tablePath + "/Metadata/schema"
    +    val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +    schema
    +  }
    +
    +  /**
    +   * Validates streamed schema against existing table schema
    +   * @param schema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(schema: TableInfo, dataSchema: StructType): Boolean = {
    +    val factTable: TableSchema = schema.getFact_table
    +
    +    import scala.collection.mutable.ListBuffer
    +    import scala.collection.JavaConverters._
    +    var columnnSchemaValues = factTable.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    +      columnDataTypes.append(columnDataType.data_type.toString)
    +    }
    +    val tableColumnDataTypeList = columnDataTypes.toList
    +
    +    var streamSchemaDataTypes = new ListBuffer[String]()
    +    for(i <- 0 until dataSchema.size) {
    +      streamSchemaDataTypes
    +        .append(
    +          mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString))
    +    }
    +    val streamedDataTypeList = streamSchemaDataTypes.toList
    +
    +    val isValid = tableColumnDataTypeList == streamedDataTypeList
    +    isValid
    +  }
    +
    +  /**
    +   * Parses streamed datatype according to carbon datatype
    +   * @param dataType
    +   * @return String
    +   */
    +  def mapStreamingDataTypeToString(dataType: String): String = {
    +    dataType match {
    +      case "IntegerType" => DataType.INT.toString
    +      case "StringType" => DataType.STRING.toString
    +      case "DateType" => DataType.DATE.toString
    +      case "DoubleType" => DataType.DOUBLE.toString
    +      case "FloatType" => DataType.DOUBLE.toString
    +      case "LongType" => DataType.LONG.toString
    +      case "ShortType" => DataType.SHORT.toString
    +      case "TimestampType" => DataType.TIMESTAMP.toString
    +    }
    +  }
    +
    +  /**
    +   * Validates if given table exists or throws exception
    +   * @param String existing carbon table path
    +   * @return None
    +   */
    +  private def validateTable(tablePath: String): Unit = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +    val absoluteTableIdentifier: AbsoluteTableIdentifier =
    +      new AbsoluteTableIdentifier(storePath,
    +        new CarbonTableIdentifier(dbName, tableName,
    +          UUID.randomUUID().toString))
    +
    +    if (!checkIfTableExists(absoluteTableIdentifier)) {
    +      throw new NoSuchTableException(dbName, tableName)
    +    }
    +  }
    +
    +  /**
    +   * Checks if table exists by checking its schema file
    +   * @param absoluteTableIdentifier
    +   * @return Boolean
    +   */
    +  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = {
    +    val carbonTablePath: CarbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(absoluteTableIdentifier)
    +    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
    +    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
    +  }
    +
    +  /**
    +   * If use wants to stream data from carbondata table source
    +   * and if following conditions are true:
    +   *    1. No schema provided by the user in readStream()
    +   *    2. spark.sql.streaming.schemaInference is set to true
    +   * carbondata can infer a table schema from a valid table path
    +   * The schema inference is not mandatory, but good have.
    +   * When possible, this method should return the schema of the given `files`.  When the format
    +   * does not support inference, or no valid files are given should return None.  In these cases
    +   * Spark will require that user specify the schema manually.
    +   */
       def inferSchema(
    --- End diff --
   
    missing override keyword?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138839229
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    --- End diff --
   
    And there is incorrect indentation at line 217


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138840023
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    --- End diff --
   
    I have rebased streaming_ingest branch to latest master, please rebase this PR


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r139074844
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.streaming;
    +
    +/**
    + * Commit info for streaming writes
    + * The commit info can be used to recover valid offset in the file
    + * in the case of write failure.
    + */
    +public class CarbonStreamingCommitInfo {
    +
    +  private String dataBase;
    +
    +  private String table;
    --- End diff --
   
    we should use tableID instead of table name


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r139085031
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.streaming;
    +
    +/**
    + * Commit info for streaming writes
    + * The commit info can be used to recover valid offset in the file
    + * in the case of write failure.
    + */
    +public class CarbonStreamingCommitInfo {
    +
    +  private String dataBase;
    +
    +  private String table;
    --- End diff --
   
    This file is from previous PR. I guess my recent rebase has moved the HEAD for streaming_ingest branch in my private repo. This needs to be corrected. Please hold off your review . On a side note - this class is just to provide a framework for storing commit info, actual values can be changed at the time of implementation.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Please hold off review. Rebase has caused some issue. I'll need to fix it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Please rebase "streaming_ingest" branch from latest "master" to take care of presto test failures.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    * Please merge this PR (Branch: **StreamIngest-1174**) to  "**streaming_ingest**" branch.
   
    * Following is the build and test report
    $>mvn -Pspark-2.1 -Dspark.version=2.1.0 clean verify
    .
   
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] Apache CarbonData :: Parent ........................ SUCCESS [  1.829 s]
    [INFO] Apache CarbonData :: Common ........................ SUCCESS [  4.067 s]
    [INFO] Apache CarbonData :: Core .......................... SUCCESS [ 46.276 s]
    [INFO] Apache CarbonData :: Processing .................... SUCCESS [ 13.728 s]
    [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [ 12.789 s]
    [INFO] Apache CarbonData :: Spark Common .................. SUCCESS [ 29.121 s]
    [INFO] Apache CarbonData :: Spark2 ........................ SUCCESS [03:02 min]
    [INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [08:54 min]
    [INFO] Apache CarbonData :: Assembly ...................... SUCCESS [  1.730 s]
    [INFO] Apache CarbonData :: Hive .......................... SUCCESS [ 13.845 s]
    [INFO] Apache CarbonData :: presto ........................ SUCCESS [ 34.772 s]
    [INFO] Apache CarbonData :: Spark2 Examples ............... SUCCESS [  9.362 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 14:44 min
    [INFO] Finished at: 2017-09-20T18:47:44-07:00
    [INFO] Final Memory: 174M/1827M
    [INFO] ------------------------------------------------------------------------
   
   
   
    $>mvn clean verify
    [INFO] Reactor Summary:
    [INFO]
    [INFO] Apache CarbonData :: Parent ........................ SUCCESS [  1.839 s]
    [INFO] Apache CarbonData :: Common ........................ SUCCESS [  3.985 s]
    [INFO] Apache CarbonData :: Core .......................... SUCCESS [ 46.732 s]
    [INFO] Apache CarbonData :: Processing .................... SUCCESS [ 13.321 s]
    [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [ 13.376 s]
    [INFO] Apache CarbonData :: Spark Common .................. SUCCESS [ 29.065 s]
    [INFO] Apache CarbonData :: Spark2 ........................ SUCCESS [03:02 min]
    [INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [08:45 min]
    [INFO] Apache CarbonData :: Assembly ...................... SUCCESS [  1.772 s]
    [INFO] Apache CarbonData :: Hive .......................... SUCCESS [ 12.770 s]
    [INFO] Apache CarbonData :: presto ........................ SUCCESS [ 38.861 s]
    [INFO] Apache CarbonData :: Spark2 Examples ............... SUCCESS [  9.452 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 14:39 min
    [INFO] Finished at: 2017-09-20T18:30:11-07:00
    [INFO] Final Memory: 173M/1628M
    [INFO] ------------------------------------------------------------------------


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Please review and  merge this PR (Branch: StreamIngest-1174) to "streaming_ingest" branch.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141342069
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -179,14 +192,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         }
       }
     
    -  /**
    -   * Returns the path of the table
    -   *
    -     * @param sparkSession
    -   * @param dbName
    -   * @param tableName
    -   * @return
    -   */
    +/**
    + * Returns the path of the table
    + * @param sparkSession
    + * @param dbName
    + * @param tableName
    + * @return
    + */
    --- End diff --
   
    please correct the indentation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141343067
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
        * be put here.  For example, user defined output committer can be configured here
        * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
        */
    -  def prepareWrite(
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    --- End diff --
   
    change `options.get("path").get` to `options("path")`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141343541
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
        * be put here.  For example, user defined output committer can be configured here
        * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
        */
    -  def prepareWrite(
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    /* Check id streaming data schema matches with carbon table schema
    +     * Data from socket source does not have schema attached to it,
    +     * Following check is to ignore schema validation for socket source.
    +     */
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val path = options.get("path")
    +      val tablePath: String = path match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +
    +      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
    +        getTableSchema(sparkSession: SparkSession, tablePath: String)
    +      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(
         sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +    tablePath: String): org.apache.carbondata.format.TableSchema = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    --- End diff --
   
    remove unnecessary bracket and `toString`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141344066
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
        * be put here.  For example, user defined output committer can be configured here
        * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
        */
    -  def prepareWrite(
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    /* Check id streaming data schema matches with carbon table schema
    +     * Data from socket source does not have schema attached to it,
    +     * Following check is to ignore schema validation for socket source.
    +     */
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val path = options.get("path")
    +      val tablePath: String = path match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +
    +      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
    +        getTableSchema(sparkSession: SparkSession, tablePath: String)
    +      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(
         sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +    tablePath: String): org.apache.carbondata.format.TableSchema = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +
    +    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +    val thriftTableInfo: org.apache.carbondata.format.TableInfo =
    +      metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
    +
    +    val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
    +    factTable
    +  }
     
       /**
    +   * Validates streamed schema against existing table schema
    +   * @param carbonTableSchema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(
    +      carbonTableSchema: org.apache.carbondata.format.TableSchema,
    +      dataSchema: StructType): Boolean = {
    +
    +    import scala.collection.mutable.ListBuffer
    +    val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    --- End diff --
   
    use `columnSchemaValues.foreach` instead of for


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141344187
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
        * be put here.  For example, user defined output committer can be configured here
        * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
        */
    -  def prepareWrite(
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    /* Check id streaming data schema matches with carbon table schema
    +     * Data from socket source does not have schema attached to it,
    +     * Following check is to ignore schema validation for socket source.
    +     */
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val path = options.get("path")
    +      val tablePath: String = path match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +
    +      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
    +        getTableSchema(sparkSession: SparkSession, tablePath: String)
    +      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(
         sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +    tablePath: String): org.apache.carbondata.format.TableSchema = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +
    +    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +    val thriftTableInfo: org.apache.carbondata.format.TableInfo =
    +      metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
    +
    +    val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
    +    factTable
    +  }
     
       /**
    +   * Validates streamed schema against existing table schema
    +   * @param carbonTableSchema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(
    +      carbonTableSchema: org.apache.carbondata.format.TableSchema,
    +      dataSchema: StructType): Boolean = {
    +
    +    import scala.collection.mutable.ListBuffer
    +    val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    +      columnDataTypes.append(columnDataType.data_type.toString)
    +    }
    +    val tableColumnDataTypeList = columnDataTypes.toList
    +
    +    var streamSchemaDataTypes = new ListBuffer[String]()
    +    for(i <- 0 until dataSchema.size) {
    --- End diff --
   
    use columnSchemaValues.foreach instead of for


---
123