Posted by
GitBox on
Nov 27, 2020; 3:24am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Karan980-opened-a-new-pull-request-4022-CARBONDATA-4056-Added-global-sort-for-data-tp103521p103688.html
ajantha-bhat commented on a change in pull request #4022:
URL:
https://github.com/apache/carbondata/pull/4022#discussion_r531364008##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -617,4 +637,157 @@ object SecondaryIndexUtil {
identifiedSegments
}
+ /**
+ * This method deletes the old carbondata files.
+ */
+ private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+ validSegments: util.List[Segment],
+ indexCarbonTable: CarbonTable): Unit = {
+ validSegments.asScala.foreach { segment =>
+ val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+ segment.getSegmentNo)
+ val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+ }})
+ dataFiles.foreach(dataFile =>
+ if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+ dataFile.delete()
+ })
+ }
+ }
+
+ def mergeSISegmentDataFiles(sparkSession: SparkSession,
+ carbonLoadModel: CarbonLoadModel,
+ carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+ val validSegments = carbonMergerMapping.validSegments.toList
+ val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+ val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val job: Job = new Job(jobConf)
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+ CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+ val proj = indexCarbonTable.getCreateOrderColumn
+ .asScala
+ .map(_.getColName)
+ .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+ var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+ val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+ val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+ val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+ CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+ outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+ val segmentMetaDataAccumulator = sparkSession.sqlContext
+ .sparkContext
+ .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+ validSegments.foreach { segment =>
Review comment:
This can be a spark job, for multiple segments. Handling sequentially is bad
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]