[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

classic Classic list List threaded Threaded
47 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
GitHub user aniketadnaik opened a pull request:

    https://github.com/apache/carbondata/pull/1352

    [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

    - Description:
    This change is mainly targeted for "streaming_ingest" development branch. Following changes are added on top of previous framework changes (pr-1064):
    1. schema validation of input data if its from a file source when schema is specified. We validate source schema against existing table schema. For socket source , there is no schema validation required since there is no schema attached to it.
    2. added streaming examples - for file stream and socket stream sources,
    CarbonStreamingIngestFileSourceExample.scala , CarbonStreamingIngestSocketSourceExample.scala
     these examples are added to facilitate development activity to understand and analyze code flow. The examples would run in its totality when carbondata is able write into carbondata file format.
   
    - Whether new unit test cases have been added or why no new tests are required?
      Yes , new unit test for schema validation has been added
   
    - What manual testing you have done?
    $> mvn clean -Pspark-2.1 -Dspark.version=2.1.0  verify
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] Apache CarbonData :: Parent ........................ SUCCESS [  1.320 s]
    [INFO] Apache CarbonData :: Common ........................ SUCCESS [  1.509 s]
    [INFO] Apache CarbonData :: Core .......................... SUCCESS [ 26.109 s]
    [INFO] Apache CarbonData :: Processing .................... SUCCESS [  4.892 s]
    [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [  8.910 s]
    [INFO] Apache CarbonData :: Spark Common .................. SUCCESS [ 13.876 s]
    [INFO] Apache CarbonData :: Spark2 ........................ SUCCESS [02:29 min]
    [INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [07:06 min]
    [INFO] Apache CarbonData :: Assembly ...................... SUCCESS [  1.724 s]
    [INFO] Apache CarbonData :: Flink Examples ................ SUCCESS [  2.480 s]
    [INFO] Apache CarbonData :: Hive .......................... SUCCESS [  4.776 s]
    [INFO] Apache CarbonData :: presto ........................ SUCCESS [  5.786 s]
    [INFO] Apache CarbonData :: Spark2 Examples ............... SUCCESS [  4.957 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 10:52 min
    [INFO] Finished at: 2017-09-12T10:50:40-07:00
    [INFO] Final Memory: 119M/1223M
    [INFO] ------------------------------------------------------------------------
   
      $> mvn clean verify
      [INFO] ------------------------------------------------------------------------
      [INFO] Reactor Summary:
      [INFO]
      [INFO] Apache CarbonData :: Parent ........................ SUCCESS [  6.925 s]
      [INFO] Apache CarbonData :: Common ........................ SUCCESS [ 10.383 s]
      [INFO] Apache CarbonData :: Core .......................... SUCCESS [02:07 min]
      [INFO] Apache CarbonData :: Processing .................... SUCCESS [ 21.376 s]
      [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [ 18.568 s]
      [INFO] Apache CarbonData :: Spark Common .................. SUCCESS [01:03 min]
      [INFO] Apache CarbonData :: Spark ......................... SUCCESS [04:34 min]
      [INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [24:33 min]
      [INFO] Apache CarbonData :: Assembly ...................... SUCCESS [  8.661 s]
      [INFO] Apache CarbonData :: Spark Examples ................ SUCCESS [ 22.520 s]
      [INFO] Apache CarbonData :: Flink Examples ................ SUCCESS [  6.592 s]
      [INFO] ------------------------------------------------------------------------
      [INFO] BUILD SUCCESS
      [INFO] ------------------------------------------------------------------------
      [INFO] Total time: 33:55 min
      [INFO] Finished at: 2017-09-12T08:12:30-07:00
      [INFO] Final Memory: 62M/298M
      [INFO] ------------------------------------------------------------------------
        * Made sure write path class invocation and schema validation happens correctly with with spark structured streaming (2.1) and parquet file source
        * Made sure write path execution work flow with structured streaming(2.1) for both socket and file
    sources
   
    - Any additional information to help reviewers in testing this change.
    For invalid schema carbondata throws exception and no record writer will be be instantiated. This is kind of first level of validation of input streaming data at CarbonSource entry point, another level of input data validation happens in carbon load path anyway.  
    Some file sources allow schema to be inferred if  "spark.sql.streaming.schemaInference" is set to true and if no explicit schema is specified.In such case we validate againist inferred schema. Carbondata also provides inferSchema functionality if table path is provided.The inferSchema() functionality is used in read path (readStream) and will be applicable when read path functionality is implemented.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aniketadnaik/carbondataStreamIngest streamIngest-1174

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1352
   
----
commit ac4f9c2a3bd0c7e7569bde9ce797abcb424222a4
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-08T00:28:00Z

    [CARBONDATA-1174] Streaming Ingestion - Schema validation and Examples

commit 8e710b8b5265cc1b3db52deecfae2086cb46993b
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-09T00:32:02Z

    [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

commit 991d12aa0ec8ec58a5763f28ef6260c668b1f1c4
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-09T00:32:39Z

    [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

commit 61d283ef63faabdd97e90d0c5f6d862f073c5b2b
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-10T00:54:03Z

    [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

commit 6e24d4fa1af90bd61a4c1bb5bf80321135761973
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-12T01:59:48Z

    [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

commit 84fb1b76ce319841721db0ed8ef719b16d6c9acf
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-12T08:53:07Z

    [CARBONDATA-1174] Streaming Ingestion - Schema validation and Examples

commit 97646ae45defa1d09bcefa04ddd0497e9238e8fa
Author: Aniket Adnaik <[hidden email]>
Date:   2017-09-12T14:36:54Z

     [CARBONDATA-1174] Streaming Ingestion - Schema validation and Examples

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Can one of the admins verify this patch?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Can one of the admins verify this patch?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Can one of the admins verify this patch?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Can one of the admins verify this patch?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138548603
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    --- End diff --
   
    Can you add description to this example to describe what it does briefly?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138549147
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    +      .format("carbondata")
    +      .option("tableName", streamTableName)
    +      .option("compress", "true")
    +      .option("tempCSV", "false")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
    +
    +    // Create csv data frame file
    +    val csvDataDF = spark.sparkContext.parallelize(11 to 30)
    +      .map(id => (id,
    +        s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}",
    +        s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}",
    +        10000.00*id)).toDF("id", "name", "city", "salary")
    +
    +    // write data into csv file ( It will be used as a stream source)
    +    csvDataDF.write.
    +      format("com.databricks.spark.csv").
    --- End diff --
   
    You are using spark2, right? In spark2, CSV data source is built-in in spark, no need to use `com.databricks.spark.csv` package.  Just use `df.write.csv()`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138549504
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingCleanupUtil.scala ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples.utils
    +
    +import java.io.IOException
    +
    +import scala.tools.nsc.io.Path
    +
    +// scalastyle:off println
    +object StreamingCleanupUtil {
    --- End diff --
   
    Is this used somewhere in the example? Or need to run it manually?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138550083
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingCleanupUtil.scala ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples.utils
    +
    +import java.io.IOException
    +
    +import scala.tools.nsc.io.Path
    +
    +// scalastyle:off println
    +object StreamingCleanupUtil {
    --- End diff --
   
    I think it is better to move the CSV data generation to this utility object, and change its name to `StreamingExampleUtil`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138550496
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    --- End diff --
   
    Is this meant to mimic history data, if yes, please mention it in comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138551126
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    +      .format("carbondata")
    +      .option("tableName", streamTableName)
    +      .option("compress", "true")
    +      .option("tempCSV", "false")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
    +
    +    // Create csv data frame file
    +    val csvDataDF = spark.sparkContext.parallelize(11 to 30)
    +      .map(id => (id,
    +        s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}",
    +        s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}",
    +        10000.00*id)).toDF("id", "name", "city", "salary")
    +
    +    // write data into csv file ( It will be used as a stream source)
    +    csvDataDF.write.
    +      format("com.databricks.spark.csv").
    +      option("header", "true").
    +      save(csvDataDir)
    +
    +    // define custom schema
    +    val inputSchema = new StructType().
    +      add("id", "integer").
    +      add("name", "string").
    +      add("city", "string").
    +      add("salary", "float")
    +
    +    // Read csv data file as a streaming source
    +    val csvReadDF = spark.readStream.
    --- End diff --
   
    Can we make this example in a loop to mimic the real streaming case? For example, you can create 3 threads
    Thread 1:  keep writing data into that CSV every 1 second
    Thread 2: reading the CSV file and write to carbondata table.  (writeStream)
    Thread 3: query the carbondata table for every 2 seconds.



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138557669
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala ---
    @@ -0,0 +1,43 @@
    +package org.apache.spark.carbondata.streaming
    +
    +import org.apache.hadoop.mapreduce.Job
    +
    +import org.apache.spark.sql.common.util.QueryTest
    +import org.apache.spark.sql.{CarbonSource, SparkSession}
    +import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
    +import org.apache.spark.sql.test.TestQueryExecutor
    +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    +
    +import org.scalatest.{BeforeAndAfterAll, FunSuite}
    +
    +
    +class CarbonSourceSchemaValidationTest extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll() {
    +    sql("DROP TABLE IF EXISTS _carbon_stream_table_")
    +  }
    +
    +  test("Testing validate schema method with correct values ") {
    +
    +    val spark = SparkSession.builder
    +      .appName("StreamIngestSchemaValidation")
    +      .master("local")
    +      .getOrCreate()
    +
    +    val carbonSource = new CarbonSource
    +    val job = new Job()
    +    val storeLocation = TestQueryExecutor.storeLocation
    +
    +    println(s"Resource Path: $resourcesPath")
    --- End diff --
   
    remove println


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138558167
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the format
    - * does not support inference, or no valid files are given should return None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    --- End diff --
   
    why not equal to 1?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138558272
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the format
    - * does not support inference, or no valid files are given should return None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    --- End diff --
   
    Why check StringType only?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138558394
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the format
    - * does not support inference, or no valid files are given should return None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val tablePath = options.get("path")
    +      val path: String = tablePath match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +      val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, path)
    +      val schemaPath = path + "/Metadata/schema"
    +      val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +      val isSchemaValid = validateSchema(schema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(sparkSession: SparkSession, tablePath: String): TableInfo = {
    +    val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, tablePath)
    +    val schemaPath = tablePath + "/Metadata/schema"
    +    val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +    schema
    +  }
    +
    +  /**
    +   * Validates streamed schema against existing table schema
    +   * @param schema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(schema: TableInfo, dataSchema: StructType): Boolean = {
    +    val factTable: TableSchema = schema.getFact_table
    +
    +    import scala.collection.mutable.ListBuffer
    +    import scala.collection.JavaConverters._
    +    var columnnSchemaValues = factTable.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    +      columnDataTypes.append(columnDataType.data_type.toString)
    +    }
    +    val tableColumnDataTypeList = columnDataTypes.toList
    +
    +    var streamSchemaDataTypes = new ListBuffer[String]()
    +    for(i <- 0 until dataSchema.size) {
    +      streamSchemaDataTypes
    +        .append(
    +          mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString))
    +    }
    +    val streamedDataTypeList = streamSchemaDataTypes.toList
    +
    +    val isValid = tableColumnDataTypeList == streamedDataTypeList
    +    isValid
    +  }
    +
    +  /**
    +   * Parses streamed datatype according to carbon datatype
    +   * @param dataType
    +   * @return String
    +   */
    +  def mapStreamingDataTypeToString(dataType: String): String = {
    +    dataType match {
    +      case "IntegerType" => DataType.INT.toString
    +      case "StringType" => DataType.STRING.toString
    +      case "DateType" => DataType.DATE.toString
    +      case "DoubleType" => DataType.DOUBLE.toString
    +      case "FloatType" => DataType.DOUBLE.toString
    +      case "LongType" => DataType.LONG.toString
    +      case "ShortType" => DataType.SHORT.toString
    +      case "TimestampType" => DataType.TIMESTAMP.toString
    +    }
    +  }
    +
    +  /**
    +   * Validates if given table exists or throws exception
    +   * @param String existing carbon table path
    +   * @return None
    +   */
    +  private def validateTable(tablePath: String): Unit = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +    val absoluteTableIdentifier: AbsoluteTableIdentifier =
    +      new AbsoluteTableIdentifier(storePath,
    +        new CarbonTableIdentifier(dbName, tableName,
    +          UUID.randomUUID().toString))
    +
    +    if (!checkIfTableExists(absoluteTableIdentifier)) {
    +      throw new NoSuchTableException(dbName, tableName)
    +    }
    +  }
    +
    +  /**
    +   * Checks if table exists by checking its schema file
    +   * @param absoluteTableIdentifier
    +   * @return Boolean
    +   */
    +  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = {
    +    val carbonTablePath: CarbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(absoluteTableIdentifier)
    +    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
    +    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
    +  }
    +
    +  /**
    +   * If use wants to stream data from carbondata table source
    +   * and if following conditions are true:
    +   *    1. No schema provided by the user in readStream()
    +   *    2. spark.sql.streaming.schemaInference is set to true
    +   * carbondata can infer a table schema from a valid table path
    +   * The schema inference is not mandatory, but good have.
    +   * When possible, this method should return the schema of the given `files`.  When the format
    +   * does not support inference, or no valid files are given should return None.  In these cases
    +   * Spark will require that user specify the schema manually.
    +   */
       def inferSchema(
    --- End diff --
   
    Where is this function used?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1352: [CARBONDATA-1174] Streaming Ingestion - schema valid...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1352
 
    Can one of the admins verify this patch?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138812980
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the format
    - * does not support inference, or no valid files are given should return None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val tablePath = options.get("path")
    +      val path: String = tablePath match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +      val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, path)
    +      val schemaPath = path + "/Metadata/schema"
    +      val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +      val isSchemaValid = validateSchema(schema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(sparkSession: SparkSession, tablePath: String): TableInfo = {
    +    val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, tablePath)
    +    val schemaPath = tablePath + "/Metadata/schema"
    +    val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +    schema
    +  }
    +
    +  /**
    +   * Validates streamed schema against existing table schema
    +   * @param schema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(schema: TableInfo, dataSchema: StructType): Boolean = {
    +    val factTable: TableSchema = schema.getFact_table
    +
    +    import scala.collection.mutable.ListBuffer
    +    import scala.collection.JavaConverters._
    +    var columnnSchemaValues = factTable.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    +      columnDataTypes.append(columnDataType.data_type.toString)
    +    }
    +    val tableColumnDataTypeList = columnDataTypes.toList
    +
    +    var streamSchemaDataTypes = new ListBuffer[String]()
    +    for(i <- 0 until dataSchema.size) {
    +      streamSchemaDataTypes
    +        .append(
    +          mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString))
    +    }
    +    val streamedDataTypeList = streamSchemaDataTypes.toList
    +
    +    val isValid = tableColumnDataTypeList == streamedDataTypeList
    +    isValid
    +  }
    +
    +  /**
    +   * Parses streamed datatype according to carbon datatype
    +   * @param dataType
    +   * @return String
    +   */
    +  def mapStreamingDataTypeToString(dataType: String): String = {
    +    dataType match {
    +      case "IntegerType" => DataType.INT.toString
    +      case "StringType" => DataType.STRING.toString
    +      case "DateType" => DataType.DATE.toString
    +      case "DoubleType" => DataType.DOUBLE.toString
    +      case "FloatType" => DataType.DOUBLE.toString
    +      case "LongType" => DataType.LONG.toString
    +      case "ShortType" => DataType.SHORT.toString
    +      case "TimestampType" => DataType.TIMESTAMP.toString
    +    }
    +  }
    +
    +  /**
    +   * Validates if given table exists or throws exception
    +   * @param String existing carbon table path
    +   * @return None
    +   */
    +  private def validateTable(tablePath: String): Unit = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +    val absoluteTableIdentifier: AbsoluteTableIdentifier =
    +      new AbsoluteTableIdentifier(storePath,
    +        new CarbonTableIdentifier(dbName, tableName,
    +          UUID.randomUUID().toString))
    +
    +    if (!checkIfTableExists(absoluteTableIdentifier)) {
    +      throw new NoSuchTableException(dbName, tableName)
    +    }
    +  }
    +
    +  /**
    +   * Checks if table exists by checking its schema file
    +   * @param absoluteTableIdentifier
    +   * @return Boolean
    +   */
    +  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = {
    +    val carbonTablePath: CarbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(absoluteTableIdentifier)
    +    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
    +    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
    +  }
    +
    +  /**
    +   * If use wants to stream data from carbondata table source
    +   * and if following conditions are true:
    +   *    1. No schema provided by the user in readStream()
    +   *    2. spark.sql.streaming.schemaInference is set to true
    +   * carbondata can infer a table schema from a valid table path
    +   * The schema inference is not mandatory, but good have.
    +   * When possible, this method should return the schema of the given `files`.  When the format
    +   * does not support inference, or no valid files are given should return None.  In these cases
    +   * Spark will require that user specify the schema manually.
    +   */
       def inferSchema(
    --- End diff --
   
    This is used in read path. Its called from DatSource -> SourceSchema() -> getOrInferFileFormatSchema() -> format.inferSchema() . We don't have read path support ready yet. But if user wants to stream data from "carbondata" as a input source( readStream.format("carbondata")), it may have to provide inferSchema() when spark.sql.streaming.InferSchema is set to true and no external schema is provided. Again, this is not a mandatory functionality but good to have.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813012
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the format
    - * does not support inference, or no valid files are given should return None.  In these cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    --- End diff --
   
    Here is a bit of background  - schema validation is not mandatory but it provides early validation if input data schema doesn't match with target table schema. With spark's structured streaming, not all input sources provide schema. Only File sources will have schema attached to them. With file sources user can either provide their schema readStream.schema() or file source will have to infer the schema internally if spark.sql.streaming.inferSchema is set to true. However, for socket streaming source there is no real schema attached to it, everything comes as a byte stream and the schema for socket source is always of size 1 and of type StringType. We need to bypass schema validation for socket source. The schema validation happens on executor side and we don't have any information about input source( whether its file source or socket source). Executor only gets schema and data , hence to avoid schema validation for socket source we need this check. This may not be clean approach,
  but I could not find any better way to handle this. If you have any info, please let me know.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813057
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala ---
    @@ -0,0 +1,43 @@
    +package org.apache.spark.carbondata.streaming
    +
    +import org.apache.hadoop.mapreduce.Job
    +
    +import org.apache.spark.sql.common.util.QueryTest
    +import org.apache.spark.sql.{CarbonSource, SparkSession}
    +import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
    +import org.apache.spark.sql.test.TestQueryExecutor
    +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    +
    +import org.scalatest.{BeforeAndAfterAll, FunSuite}
    +
    +
    +class CarbonSourceSchemaValidationTest extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll() {
    +    sql("DROP TABLE IF EXISTS _carbon_stream_table_")
    +  }
    +
    +  test("Testing validate schema method with correct values ") {
    +
    +    val spark = SparkSession.builder
    +      .appName("StreamIngestSchemaValidation")
    +      .master("local")
    +      .getOrCreate()
    +
    +    val carbonSource = new CarbonSource
    +    val job = new Job()
    +    val storeLocation = TestQueryExecutor.storeLocation
    +
    +    println(s"Resource Path: $resourcesPath")
    --- End diff --
   
    yes.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138813289
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +
    +import org.apache.commons.lang.RandomStringUtils
    +import org.apache.spark.sql.{SaveMode, SparkSession}
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.examples.utils.StreamingCleanupUtil
    +
    +object CarbonStreamingIngestFileSourceExample {
    +
    +  def main(args: Array[String]) {
    +
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +      + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    +    val metastoredb = s"$rootPath/examples/spark2/target"
    +    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
    +    // val csvDataFile = s"$csvDataDir/sampleData.csv"
    +    // val csvDataFile = s"$csvDataDir/sample.csv"
    +    val streamTableName = s"_carbon_file_stream_table_"
    +    val stremTablePath = s"$storeLocation/default/$streamTableName"
    +    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
    +
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
    +
    +    // cleanup any residual files
    +    StreamingCleanupUtil.main(Array(csvDataDir, ckptLocation))
    +
    +    import org.apache.spark.sql.CarbonSession._
    +    val spark = SparkSession
    +      .builder()
    +      .master("local")
    +      .appName("CarbonFileStreamingExample")
    +      .config("spark.sql.warehouse.dir", warehouse)
    +      .getOrCreateCarbonSession(storeLocation, metastoredb)
    +
    +    spark.sparkContext.setLogLevel("ERROR")
    +
    +    // Writes Dataframe to CarbonData file:
    +    import spark.implicits._
    +    import org.apache.spark.sql.types._
    +
    +    // Generate random data
    +    val dataDF = spark.sparkContext.parallelize(1 to 10)
    +      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id)).
    +      toDF("id", "name", "city", "salary")
    +
    +    // drop table if exists previously
    +    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
    +
    +    // Create Carbon Table
    +    // Saves dataframe to carbondata file
    +    dataDF.write
    +      .format("carbondata")
    +      .option("tableName", streamTableName)
    +      .option("compress", "true")
    +      .option("tempCSV", "false")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
    +
    +    // Create csv data frame file
    +    val csvDataDF = spark.sparkContext.parallelize(11 to 30)
    +      .map(id => (id,
    +        s"name_${RandomStringUtils.randomAlphabetic(4).toUpperCase}",
    +        s"city_${RandomStringUtils.randomAlphabetic(2).toUpperCase}",
    +        10000.00*id)).toDF("id", "name", "city", "salary")
    +
    +    // write data into csv file ( It will be used as a stream source)
    +    csvDataDF.write.
    +      format("com.databricks.spark.csv").
    +      option("header", "true").
    +      save(csvDataDir)
    +
    +    // define custom schema
    +    val inputSchema = new StructType().
    +      add("id", "integer").
    +      add("name", "string").
    +      add("city", "string").
    +      add("salary", "float")
    +
    +    // Read csv data file as a streaming source
    +    val csvReadDF = spark.readStream.
    --- End diff --
   
    I referred to streaming examples from Spark and wanted to keep it similar and simple. However, it should be doable. At this point we don't have read support yet, may be I change it to have two threads, one for writing to csv every few seconds  and other for writing to carbondata table . This example can be enhanced later to have read verification when read path supported is added.


---
123