[GitHub] [carbondata] Zhangshunyu commented on a change in pull request #4020: [CARBONDATA-4054] Support data size control for minor compaction

Posted by GitBox on
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Zhangshunyu-opened-a-new-pull-request-4020-CARBONDATA-4054-Support-data-size-contrn-tp103477p104023.html


Zhangshunyu commented on a change in pull request #4020:
URL: https://github.com/apache/carbondata/pull/4020#discussion_r533948673



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,134 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'")
+    // check segment 3 whose size exceed the limit should not be compacted but success
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'")
+    // check segment 3 whose size not exceed the new threshold limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)
+    assertResult(400030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // reset to -1
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")

Review comment:
       @akashrn5 Oh, I see. Now handled, pls check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]