[ https://issues.apache.org/jira/browse/CARBONDATA-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geetika Gupta updated CARBONDATA-2002: -------------------------------------- Description: I created a streaming table and loaded data into it using the following commands on spark shell: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") carbon.sql("drop table if exists uniqdata_stream") carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); import carbon.sqlContext.implicits._ import org.apache.spark.sql.types._ val uniqdataSch = StructType( Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) val streamDf = carbon.readStream .schema(uniqdataSch) .option("sep", ",") .csv("file:///home/geetika/Downloads/uniqdata") val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) .option("checkpointLocation","/stream/uniq") .option("dbName", "default") .option("tableName", "uniqdata_stream") .start() qry.awaitTermination() //Press ctrl+c to terminate start the spark shell again import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("show segments for table uniqdata_stream").show It shows the following output: +-----------------+---------+--------------------+-------------+---------+-----------+ |SegmentSequenceId| Status| Load Start Time|Load End Time|Merged To|File Format| +-----------------+---------+--------------------+-------------+---------+-----------+ | 0|Streaming|2018-01-05 18:23:...| null| NA| ROW_V1| +-----------------+---------+--------------------+-------------+---------+-----------+ Status for the segment is not updated was: I created a streaming table and loaded data into it using the following commands on spark shell: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("drop table if exists uniqdata_part") carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); import carbon.sqlContext.implicits._ import org.apache.spark.sql.types._ val uniqdataSch = StructType( Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) val streamDf = carbon.readStream .schema(uniqdataSch) .option("sep", ",") .csv("file:///home/geetika/Downloads/uniqdata") val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) .option("checkpointLocation","/stream/uniq") .option("dbName", "default") .option("tableName", "uniqdata_stream") .start() qry.awaitTermination() //Press ctrl+c to terminate start the spark shell again import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("show segments for table uniqdata_stream").show It shows the following output: +-----------------+---------+--------------------+-------------+---------+-----------+ |SegmentSequenceId| Status| Load Start Time|Load End Time|Merged To|File Format| +-----------------+---------+--------------------+-------------+---------+-----------+ | 0|Streaming|2018-01-05 18:23:...| null| NA| ROW_V1| +-----------------+---------+--------------------+-------------+---------+-----------+ Status for the segment is not updated > Streaming segment status is not getting updated to finished or success > ---------------------------------------------------------------------- > > Key: CARBONDATA-2002 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2002 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 1.3.0 > Environment: spark2.1 > Reporter: Geetika Gupta > Fix For: 1.3.0 > > Attachments: 2000_UniqData.csv > > > I created a streaming table and loaded data into it using the following commands on spark shell: > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.CarbonSession._ > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} > val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") > carbon.sql("drop table if exists uniqdata_stream") > carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); > import carbon.sqlContext.implicits._ > import org.apache.spark.sql.types._ > val uniqdataSch = StructType( > Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) > val streamDf = carbon.readStream > .schema(uniqdataSch) > .option("sep", ",") > .csv("file:///home/geetika/Downloads/uniqdata") > val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation","/stream/uniq") > .option("dbName", "default") > .option("tableName", "uniqdata_stream") > .start() > qry.awaitTermination() > //Press ctrl+c to terminate > start the spark shell again > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.CarbonSession._ > val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") > carbon.sql("show segments for table uniqdata_stream").show > It shows the following output: > +-----------------+---------+--------------------+-------------+---------+-----------+ > |SegmentSequenceId| Status| Load Start Time|Load End Time|Merged To|File Format| > +-----------------+---------+--------------------+-------------+---------+-----------+ > | 0|Streaming|2018-01-05 18:23:...| null| NA| ROW_V1| > +-----------------+---------+--------------------+-------------+---------+-----------+ > Status for the segment is not updated -- This message was sent by Atlassian JIRA (v6.4.14#64029) |
Free forum by Nabble | Edit this page |