[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

classic Classic list List threaded Threaded
31 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
GitHub user ravipesala opened a pull request:

    https://github.com/apache/carbondata/pull/1876

    [CARBONDATA-2093] Use small file feature of global sort to minimise the carbondata file count

    Based on the size of csv partition files combine them and minimise the csv rdd partitions, so that it will reduce the carbondata file count. It uses the same logic and method of global sort feature.
   
    And also refactored LoadCOmmand processData method to avoid lookup in metstore.
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [X] Any interfaces changed?
     
     - [X] Any backward compatibility impacted?
     
     - [X] Document update required?
   
     - [X] Testing done
           Tests added
           
     - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata partition-refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1876
   
----
commit 6f2db90a94f8bc3241034fd15691eedf6da005a3
Author: ravipesala <ravi.pesala@...>
Date:   2018-01-28T15:07:21Z

    Use small file feature of global sort to minimise the carbondata file count

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1988/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3222/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3224/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1990/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3183/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r164652321
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -588,15 +576,13 @@ case class CarbonLoadDataCommand(
             }
             val len = rowDataTypes.length
             var rdd =
    -          new NewHadoopRDD[NullWritable, StringArrayWritable](
    -            sparkSession.sparkContext,
    -            classOf[CSVInputFormat],
    -            classOf[NullWritable],
    -            classOf[StringArrayWritable],
    -            jobConf).map { case (key, value) =>
    +          DataLoadingUtil.csvFileScanRDD(
    +            sparkSession,
    +            model = carbonLoadModel,
    +            hadoopConf).map { row =>
    --- End diff --
   
    move `.map` to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r164655879
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---
    @@ -414,6 +416,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
           sql("select * from  casesensitivepartition where empno=17"))
       }
     
    +  test("Partition LOAD with small files") {
    +    sql("DROP TABLE IF EXISTS smallpartitionfiles")
    +    sql(
    +      """
    +        | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    val inputPath = new File("target/small_files").getCanonicalPath
    +    val folder = new File(inputPath)
    +    if (folder.exists()) {
    +      FileUtils.deleteDirectory(folder)
    +    }
    +    folder.mkdir()
    +    for (i <- 0 to 100) {
    +      val file = s"$folder/file$i.csv"
    +      val writer = new FileWriter(file)
    +      writer.write("id,name,city,age\n")
    +      writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }")
    +      writer.close()
    +    }
    +    sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles")
    +    FileUtils.deleteDirectory(folder)
    +    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
    +    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
    +    assert(new File(segmentDir).listFiles().length < 50)
    +  }
    +
    +  test("verify partition read with small files") {
    +    try {
    +      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
    +        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
    +      sql("DROP TABLE IF EXISTS smallpartitionfilesread")
    +      sql(
    +        """
    +          | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY
    +          | (city STRING)
    +          | STORED BY 'org.apache.carbondata.format'
    +        """.stripMargin)
    +      val inputPath = new File("target/small_files").getCanonicalPath
    +      val folder = new File(inputPath)
    +      if (folder.exists()) {
    +        FileUtils.deleteDirectory(folder)
    +      }
    +      folder.mkdir()
    +      for (i <- 0 until 100) {
    +        val file = s"$folder/file$i.csv"
    +        val writer = new FileWriter(file)
    +        writer.write("id,name,city,age\n")
    +        writer.write(s"$i,name_$i,city_${ i },${ i % 100 }")
    +        writer.close()
    +      }
    +      sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread")
    +      FileUtils.deleteDirectory(folder)
    +      val dataFrame = sql("select * from smallpartitionfilesread")
    +      val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
    +        case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
    +          .asInstanceOf[CarbonScanRDD]
    +      }.head
    +      assert(scanRdd.getPartitions.length < 10)
    +      assertResult(100)(dataFrame.collect().length)
    --- End diff --
   
    suggest to use dataFrame.count


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r164671788
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -588,15 +576,13 @@ case class CarbonLoadDataCommand(
             }
             val len = rowDataTypes.length
             var rdd =
    -          new NewHadoopRDD[NullWritable, StringArrayWritable](
    -            sparkSession.sparkContext,
    -            classOf[CSVInputFormat],
    -            classOf[NullWritable],
    -            classOf[StringArrayWritable],
    -            jobConf).map { case (key, value) =>
    +          DataLoadingUtil.csvFileScanRDD(
    +            sparkSession,
    +            model = carbonLoadModel,
    +            hadoopConf).map { row =>
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r164671811
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---
    @@ -414,6 +416,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
           sql("select * from  casesensitivepartition where empno=17"))
       }
     
    +  test("Partition LOAD with small files") {
    +    sql("DROP TABLE IF EXISTS smallpartitionfiles")
    +    sql(
    +      """
    +        | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    val inputPath = new File("target/small_files").getCanonicalPath
    +    val folder = new File(inputPath)
    +    if (folder.exists()) {
    +      FileUtils.deleteDirectory(folder)
    +    }
    +    folder.mkdir()
    +    for (i <- 0 to 100) {
    +      val file = s"$folder/file$i.csv"
    +      val writer = new FileWriter(file)
    +      writer.write("id,name,city,age\n")
    +      writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }")
    +      writer.close()
    +    }
    +    sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles")
    +    FileUtils.deleteDirectory(folder)
    +    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
    +    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
    +    assert(new File(segmentDir).listFiles().length < 50)
    +  }
    +
    +  test("verify partition read with small files") {
    +    try {
    +      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
    +        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
    +      sql("DROP TABLE IF EXISTS smallpartitionfilesread")
    +      sql(
    +        """
    +          | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY
    +          | (city STRING)
    +          | STORED BY 'org.apache.carbondata.format'
    +        """.stripMargin)
    +      val inputPath = new File("target/small_files").getCanonicalPath
    +      val folder = new File(inputPath)
    +      if (folder.exists()) {
    +        FileUtils.deleteDirectory(folder)
    +      }
    +      folder.mkdir()
    +      for (i <- 0 until 100) {
    +        val file = s"$folder/file$i.csv"
    +        val writer = new FileWriter(file)
    +        writer.write("id,name,city,age\n")
    +        writer.write(s"$i,name_$i,city_${ i },${ i % 100 }")
    +        writer.close()
    +      }
    +      sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread")
    +      FileUtils.deleteDirectory(folder)
    +      val dataFrame = sql("select * from smallpartitionfilesread")
    +      val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
    +        case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
    +          .asInstanceOf[CarbonScanRDD]
    +      }.head
    +      assert(scanRdd.getPartitions.length < 10)
    +      assertResult(100)(dataFrame.collect().length)
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3255/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2020/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3270/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2034/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3211/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r164983438
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -94,6 +89,8 @@ case class CarbonLoadDataCommand(
     
       var table: CarbonTable = _
     
    +  var logicalRelation: LogicalRelation = _
    --- End diff --
   
    in some scenarios, the case class command should be serializable.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1876#discussion_r165302961
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -94,6 +89,8 @@ case class CarbonLoadDataCommand(
     
       var table: CarbonTable = _
     
    +  var logicalRelation: LogicalRelation = _
    --- End diff --
   
    Yes I know that, is LogicalRelation cannot be serialized in those scenarios?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2177/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1876: [CARBONDATA-2093] Use small file feature of global s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1876
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3414/



---
12