GitHub user BJangir opened a pull request:
https://github.com/apache/carbondata/pull/2703 [CARBONDATA-2925]Wrong data displayed for spark file format if carbon⦠Issue:- if Carbon file has multiple blocklet ,in select query wrong data displayed. Root Cause :- it is showing records of only for 1st Blocklet and other blocklet in that block is getting skipped. This is because default blocklet is 0 and CarbonFileformat create blockletInfo with default configuration (not changed blockletID). Solution :- Set default blockletID to -1 so that all blocklets are considered. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? NA - [ ] Any backward compatibility impacted? NA - [ ] Document update required? NA - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. Manually and Testcase also added - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/BJangir/incubator-carbondata CARBONDATA-2925 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2703.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2703 ---- commit 6d57c2b52a1c13828262738a82d9fbdca2424c02 Author: BJangir <babulaljangir111@...> Date: 2018-09-10T14:05:33Z [CARBONDATA-2925]Wrong data displayed for spark file format if carbon file has multiple blocklet ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/207/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/210/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8449/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/379/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r216412232 --- Diff: integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -320,6 +324,66 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA assert(new File(filePath).exists()) cleanTestData() } + test("Read data having multi blocklet ") { + buildTestDataMuliBlockLet(700000) + assert(new File(writerPath).exists()) + spark.sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO --- End diff -- Build will fail for spark 2.3. just keep 2.1 in if check and remaining in else case --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r216412941 --- Diff: integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -320,6 +324,66 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA assert(new File(filePath).exists()) cleanTestData() } + test("Read data having multi blocklet ") { + buildTestDataMuliBlockLet(700000) + assert(new File(writerPath).exists()) + spark.sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO + } + //org.apache.spark.SparkException: Index file not present to read the carbondata file + spark.sql("select count(*) from sdkOutputTable").show(false) + val result=checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(700001))) + if(result.isDefined){ + assert(false,result.get) + } + spark.sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + def buildTestDataMuliBlockLet(records :Int): Unit ={ + FileUtils.deleteDirectory(new File(writerPath)) + val oldValue=CarbonProperties.getInstance().getProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB) --- End diff -- sdk already supports blocklet size setting in table properties. please use that --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r216413274 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java --- @@ -51,7 +51,8 @@ private short versionNumber; - private short blockletId; + // default blockletId should be -1,which means consider all the blockets in bloack --- End diff -- *block --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/230/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/399/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8469/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/234/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8473/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/403/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r216962967 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java --- @@ -51,7 +51,8 @@ private short versionNumber; - private short blockletId; + // default blockletId should be -1,which means consider all the blocklets in block --- End diff -- I think it's better not to change the default behavior. Instead we should optimize the behavior of SDK reader for the spark file format. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r216963506 --- Diff: integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -322,6 +328,56 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA assert(new File(filePath).exists()) cleanTestData() } + test("Read data having multi blocklet ") { + buildTestDataMuliBlockLet(700000) + assert(new File(writerPath).exists()) + spark.sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } + spark.sql("select count(*) from sdkOutputTable").show(false) + val result=checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(700001))) --- End diff -- it says in line#331 that the number is 700000, why is the result 700001 here? You'd better use 'until' in line#372 to keep them consistent. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r216963709 --- Diff: integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -322,6 +328,56 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA assert(new File(filePath).exists()) cleanTestData() } + test("Read data having multi blocklet ") { + buildTestDataMuliBlockLet(700000) + assert(new File(writerPath).exists()) + spark.sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } + spark.sql("select count(*) from sdkOutputTable").show(false) + val result=checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(700001))) + if(result.isDefined){ + assert(false,result.get) + } + spark.sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + def buildTestDataMuliBlockLet(records :Int): Unit ={ + FileUtils.deleteDirectory(new File(writerPath)) + val fields=new Array[Field](8) + fields(0)=new Field("myid",DataTypes.INT); + fields(1)=new Field("event_id",DataTypes.STRING); + fields(2)=new Field("eve_time",DataTypes.DATE); + fields(3)=new Field("ingestion_time",DataTypes.TIMESTAMP); + fields(4)=new Field("alldate",DataTypes.createArrayType(DataTypes.DATE)); + fields(5)=new Field("subject",DataTypes.STRING); + fields(6)=new Field("from_email",DataTypes.STRING); + fields(7)=new Field("sal",DataTypes.DOUBLE); + import scala.collection.JavaConverters._ + try{ + val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava + val writer=CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16).sortBy(Array("myid","ingestion_time","event_id")).withLoadOptions(options).buildWriterForCSVInput(new Schema(fields),spark.sessionState.newHadoopConf()) + val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val date_F=new SimpleDateFormat("yyyy-MM-dd") + for(i<-0 to records){ + val time=new Date(System.currentTimeMillis()) + writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0","FromEmail",""+new Random().nextDouble())) + } + writer.close() + } + --- End diff -- unnecessary blank lines --- |
In reply to this post by qiuchenjian-2
Github user BJangir commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2703#discussion_r217060418 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java --- @@ -51,7 +51,8 @@ private short versionNumber; - private short blockletId; + // default blockletId should be -1,which means consider all the blocklets in block --- End diff -- @xuchuanyin ,default cache level is Block and for block we always set blockletid as -1. so it is better to handle during initialization rather than handle in each reader (if any new reader implemented in future then it will be taken care automatically otherwise each reader should take care same ). --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2703 LGTM --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2703 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/260/ --- |
Free forum by Nabble | Edit this page |