Posted by
GitBox on
Jan 29, 2021; 1:59pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Karan980-opened-a-new-pull-request-4070-CARBONDATA-4082-Fix-alter-table-add-segmen-tp105251p105799.html
Karan980 commented on a change in pull request #4070:
URL:
https://github.com/apache/carbondata/pull/4070#discussion_r566840116##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
}
+ val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()
+ .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+ if (deltaFiles.length > 0) {
+ val blockNameToDeltaFilesMap =
+ collection.mutable.Map[String, collection.mutable.ListBuffer[(CarbonFile, String)]]()
+ deltaFiles.foreach { deltaFile =>
+ val tmpDeltaFilePath = deltaFile.getAbsolutePath
+ .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+ CarbonCommonConstants.FILE_SEPARATOR)
+ val deltaFilePathElements = tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+ if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+ val deltaFileName = deltaFilePathElements(deltaFilePathElements.length - 1)
+ val blockName = CarbonTablePath.DataFileUtil
+ .getBlockNameFromDeleteDeltaFile(deltaFileName)
+ if (blockNameToDeltaFilesMap.contains(blockName)) {
+ blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+ } else {
+ val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+ deltaFileList += ((deltaFile, deltaFileName))
+ blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+ }
+ }
+ }
+ val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+ val columnCompressor = CompressorFactory.getInstance.getCompressor.getName
+ blockNameToDeltaFilesMap.foreach { entry =>
+ val segmentUpdateDetail = new SegmentUpdateDetails()
+ segmentUpdateDetail.setBlockName(entry._1)
+ segmentUpdateDetail.setActualBlockName(
+ entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+ CarbonCommonConstants.FACT_FILE_EXT)
+ segmentUpdateDetail.setSegmentName(model.getSegmentId)
+ setMinMaxDeltaStampAndDeletedRowCount(entry._2, segmentUpdateDetail)
+ segmentUpdateDetails.add(segmentUpdateDetail)
+ }
+ val timestamp = System.currentTimeMillis().toString
+ val segmentDetails = new util.HashSet[Segment]()
+ segmentDetails.add(model.getSegment)
+ CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, carbonTable, timestamp, false)
Review comment:
I have analyzed this thing and find that before writing the new tableUpdateStatus file, we look for segment entries in tableStatus file and then write data for only those segments in updateTableStatus file which are present in tableStatus file. So, if we don't have newly added segment or the segment which we are adding now entry in the tableStatus file, then it will not write its corresponding entry in new tableUpdateStatus file also. So, for this scenario it is required to write the tableStatus file twice.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]