Karan980 opened a new pull request #4022: URL: https://github.com/apache/carbondata/pull/4022 ### Why is this PR needed? Earlier global sort was not supported during data files merge operation of SI segments. So if some SI is created with global sort and value of carbon.si.segment.merge is true, it merges the data files in SI segments but disorder the globally sorted data. ### What changes were proposed in this PR? Added global sort for data files merge operation in SI segments. ### Does this PR introduce any user interface change? - No ### Is any new testcase added? - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA2 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-732902364 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3132/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-732973817 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4886/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734625127 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#discussion_r531364008 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -617,4 +637,157 @@ object SecondaryIndexUtil { identifiedSegments } + /** + * This method deletes the old carbondata files. + */ + private def deleteOldCarbonDataFiles(factTimeStamp: Long, + validSegments: util.List[Segment], + indexCarbonTable: CarbonTable): Unit = { + validSegments.asScala.foreach { segment => + val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath, + segment.getSegmentNo) + val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) + }}) + dataFiles.foreach(dataFile => + if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) { + dataFile.delete() + }) + } + } + + def mergeSISegmentDataFiles(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = { + val validSegments = carbonMergerMapping.validSegments.toList + val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) + SparkHadoopUtil.get.addCredentials(jobConf) + val job: Job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo) + val proj = indexCarbonTable.getCreateOrderColumn + .asScala + .map(_.getColName) + .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet + var mergeStatus = ArrayBuffer[((String, Boolean), String)]() + val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024 + val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable) + CarbonIndexUtil.initializeSILoadModel(outputModel, header) + outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp) + val segmentMetaDataAccumulator = sparkSession.sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] + validSegments.foreach { segment => Review comment: This can be a spark job, for multiple segments. Handling sequentially is bad ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734646180 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3184/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734646663 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4940/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734650646 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734682627 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4944/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734683112 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3189/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on a change in pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#discussion_r531439761 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -617,4 +637,157 @@ object SecondaryIndexUtil { identifiedSegments } + /** + * This method deletes the old carbondata files. + */ + private def deleteOldCarbonDataFiles(factTimeStamp: Long, + validSegments: util.List[Segment], + indexCarbonTable: CarbonTable): Unit = { + validSegments.asScala.foreach { segment => + val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath, + segment.getSegmentNo) + val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) + }}) + dataFiles.foreach(dataFile => + if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) { + dataFile.delete() + }) + } + } + + def mergeSISegmentDataFiles(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = { + val validSegments = carbonMergerMapping.validSegments.toList + val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) + SparkHadoopUtil.get.addCredentials(jobConf) + val job: Job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo) + val proj = indexCarbonTable.getCreateOrderColumn + .asScala + .map(_.getColName) + .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet + var mergeStatus = ArrayBuffer[((String, Boolean), String)]() + val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024 + val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable) + CarbonIndexUtil.initializeSILoadModel(outputModel, header) + outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp) + val segmentMetaDataAccumulator = sparkSession.sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] + validSegments.foreach { segment => Review comment: It can be handled in a separate PR. i have raised a jira ticket for it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734788102 LGTM. Please handle the problem mentioned in another PR soon, else we can't use the si merge for global sort for old tables. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
asfgit closed pull request #4022: URL: https://github.com/apache/carbondata/pull/4022 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#discussion_r531546338 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -617,4 +637,157 @@ object SecondaryIndexUtil { identifiedSegments } + /** + * This method deletes the old carbondata files. + */ + private def deleteOldCarbonDataFiles(factTimeStamp: Long, + validSegments: util.List[Segment], + indexCarbonTable: CarbonTable): Unit = { + validSegments.asScala.foreach { segment => + val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath, + segment.getSegmentNo) + val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) + }}) + dataFiles.foreach(dataFile => + if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) { + dataFile.delete() + }) + } + } + + def mergeSISegmentDataFiles(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = { + val validSegments = carbonMergerMapping.validSegments.toList + val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) + SparkHadoopUtil.get.addCredentials(jobConf) + val job: Job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo) + val proj = indexCarbonTable.getCreateOrderColumn + .asScala + .map(_.getColName) + .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet + var mergeStatus = ArrayBuffer[((String, Boolean), String)]() + val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024 + val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable) + CarbonIndexUtil.initializeSILoadModel(outputModel, header) + outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp) + val segmentMetaDataAccumulator = sparkSession.sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] + validSegments.foreach { segment => Review comment: please reply the JIRA id also here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Karan980 commented on a change in pull request #4022: URL: https://github.com/apache/carbondata/pull/4022#discussion_r531636655 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -617,4 +637,157 @@ object SecondaryIndexUtil { identifiedSegments } + /** + * This method deletes the old carbondata files. + */ + private def deleteOldCarbonDataFiles(factTimeStamp: Long, + validSegments: util.List[Segment], + indexCarbonTable: CarbonTable): Unit = { + validSegments.asScala.foreach { segment => + val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath, + segment.getSegmentNo) + val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) + }}) + dataFiles.foreach(dataFile => + if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) { + dataFile.delete() + }) + } + } + + def mergeSISegmentDataFiles(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = { + val validSegments = carbonMergerMapping.validSegments.toList + val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) + SparkHadoopUtil.get.addCredentials(jobConf) + val job: Job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo) + val proj = indexCarbonTable.getCreateOrderColumn + .asScala + .map(_.getColName) + .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet + var mergeStatus = ArrayBuffer[((String, Boolean), String)]() + val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024 + val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray + val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable) + CarbonIndexUtil.initializeSILoadModel(outputModel, header) + outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp) + val segmentMetaDataAccumulator = sparkSession.sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] + validSegments.foreach { segment => Review comment: 4060 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |