Login  Register

[GitHub] [carbondata] Karan980 commented on a change in pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

Posted by GitBox on Nov 27, 2020; 8:10am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Karan980-opened-a-new-pull-request-4022-CARBONDATA-4056-Added-global-sort-for-data-tp103521p103732.html


Karan980 commented on a change in pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#discussion_r531439761



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -617,4 +637,157 @@ object SecondaryIndexUtil {
     identifiedSegments
   }
 
+  /**
+   * This method deletes the old carbondata files.
+   */
+  private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+              validSegments: util.List[Segment],
+              indexCarbonTable: CarbonTable): Unit = {
+    validSegments.asScala.foreach { segment =>
+      val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+        segment.getSegmentNo)
+      val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }})
+      dataFiles.foreach(dataFile =>
+      if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+        dataFile.delete()
+      })
+    }
+  }
+
+  def mergeSISegmentDataFiles(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+    val validSegments = carbonMergerMapping.validSegments.toList
+    val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+    val proj = indexCarbonTable.getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+    var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+    val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+    val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+    CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+    outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    val segmentMetaDataAccumulator = sparkSession.sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    validSegments.foreach { segment =>

Review comment:
       It can be handled in a separate PR. i have raised a jira ticket for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]