CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739715250 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5074/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537273799 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ########## @@ -19,13 +19,13 @@ package org.apache.carbondata.api import java.io.{DataInputStream, FileNotFoundException, InputStreamReader} import java.time.{Duration, Instant} -import java.util import java.util.{Collections, Comparator} import scala.collection.JavaConverters._ import scala.util.control.Breaks.{break, breakable} import com.google.gson.Gson +import java.util Review comment: reverted ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537273944 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s } Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName) .collect(Collectors.toSet()); - List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains( - DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList()); + // get all stale segment files, not include compaction segments Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739717393 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3316/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739717520 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5097/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537275885 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -577,38 +556,46 @@ object CarbonDataRDDFactory { LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } - - // code to handle Pre-Priming cache for loading - - if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { - DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), - operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) - } - try { - // compaction handling - if (carbonTable.isHivePartitionTable) { - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - } - val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) - writtenSegment - } catch { - case e: Exception => - LOGGER.error( - "Auto-Compaction has failed. Ignoring this exception because the" + - " load is passed.", e) - writtenSegment - } + isLoadingCommitted = true + writtenSegment } } finally { // Release the segment lock, once table status is finally updated segmentLock.unlock() + if (isLoadingCommitted) { + triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext) + } + } + } + + private def triggerEventsAfterLoading( + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // code to handle Pre-Priming cache for loading + if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { + DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), Review comment: If auto compaction fails, the current load will be passed. we can trigger pre-priming. Why I suggested after auto comapction is for success case, that time no need to pre-prime current segment as it will become MFD if it goes under auto compaction. so, we can save pre-priming one segment may be. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537275990 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) + // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs) + } finally { + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = { + if (isForceDelete) { + // empty the trash folder + TrashUtil.emptyTrash(carbonTable.getTablePath) + } else { + // clear trash based on timestamp + TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath) + } + } + + /** + * move stale segment to trash folder, but not include compaction segment + */ + private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = { + if (carbonTable.isHivePartitionTable) { + CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable) + } else { + CleanFilesUtil.cleanStaleSegments(carbonTable) + } + } + + private def cleanExpiredSegments( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = { + val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, + isForceDelete, partitionSpecs, cleanStaleInProgress, true) + if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) { + SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete) + } + } + + /** + * clean the stale compact segment immediately after compaction failure + */ + def cleanStaleCompactionSegment( + carbonTable: CarbonTable, + mergedLoadName: String, + factTimestamp: Long, + partitionSpecs: Option[Seq[PartitionSpec]]): Unit = { + val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath) + val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath) + if (details == null || details.isEmpty) { + return + } + val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName)) + // only clean stale compaction segment + if (loadDetail.isEmpty) { + val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1) + if (carbonTable.isHivePartitionTable) { + if (partitionSpecs.isDefined) { + partitionSpecs.get.foreach { partitionSpec => + cleanStaleCompactionDataFiles( + partitionSpec.getLocation.toString, segmentId, factTimestamp) + } + } + } else { + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + cleanStaleCompactionDataFiles( + segmentPath, segmentId, factTimestamp) + } + } + } + + private def cleanStaleCompactionDataFiles( + folderPath: String, + segmentId: String, + factTimestamp: Long): Unit = { + if (FileFactory.isFileExist(folderPath)) { + val namePart = CarbonCommonConstants.HYPHEN + segmentId + + CarbonCommonConstants.HYPHEN + factTimestamp + val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(namePart) + } + }) + if (toBeDelete != null && toBeDelete.nonEmpty) { + try { + CarbonUtil.deleteFoldersAndFilesSilent(toBeDelete: _*) + } catch { + case e: Throwable => + LOGGER.error("Exception in deleting the delta files." + e) Review comment: fixed the exception message. clean data before compaction is a good suggestion, but it may remove data files(which create by other jobs) by mistake. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537276094 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s } Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName) .collect(Collectors.toSet()); - List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains( - DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList()); + // get all stale segment files, not include compaction segments + List<String> staleSegments = segmentFiles.stream() + .filter(segmentFile -> !DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains( + CarbonCommonConstants.POINT)) + .filter(segmentFile -> !loadNameSet.contains( Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537276344 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -263,15 +249,7 @@ object CarbonDataRDDFactory { throw new Exception("Exception in compaction " + exception.getMessage) } } finally { - executor.shutdownNow() - try { - compactor.deletePartialLoadsInCompaction() Review comment: ok, I see that many changes are done because to support this `DataTrashManager.cleanStaleCompactionSegment` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277121 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String tableName) { * folder will take place */ private void validateTrashFolderRetentionTime() { - String propertyValue = carbonProperties.getProperty(CarbonCommonConstants - .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants - .CARBON_TRASH_RETENTION_DAYS_DEFAULT)); + String propertyValue = carbonProperties.getProperty( Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s } Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName) .collect(Collectors.toSet()); - List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains( - DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList()); + // get all stale segment files, not include compaction segments Review comment: done ########## File path: docs/clean-files.md ########## @@ -38,6 +38,9 @@ The above clean files command will clean Marked For Delete and Compacted segment ``` Once the timestamp subdirectory is expired as per the configured expiration day value, that subdirectory is deleted from the trash folder in the subsequent clean files command. +**NOTE**: + * In trash folder, the retention time is "carbon.trash.retention.days" + * Outside trash folder, the retention time is max value of two properties("carbon.trash.retention.days", "max.query.execution.time") Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277390 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) + // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs) + } finally { + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = { + if (isForceDelete) { + // empty the trash folder + TrashUtil.emptyTrash(carbonTable.getTablePath) + } else { + // clear trash based on timestamp + TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath) + } + } + + /** + * move stale segment to trash folder, but not include compaction segment + */ + private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = { + if (carbonTable.isHivePartitionTable) { + CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable) + } else { + CleanFilesUtil.cleanStaleSegments(carbonTable) + } + } + + private def cleanExpiredSegments( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = { + val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, + isForceDelete, partitionSpecs, cleanStaleInProgress, true) + if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) { + SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete) + } + } + + /** + * clean the stale compact segment immediately after compaction failure + */ + def cleanStaleCompactionSegment( + carbonTable: CarbonTable, + mergedLoadName: String, + factTimestamp: Long, + partitionSpecs: Option[Seq[PartitionSpec]]): Unit = { + val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath) + val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath) + if (details == null || details.isEmpty) { + return + } + val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName)) + // only clean stale compaction segment + if (loadDetail.isEmpty) { + val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1) + if (carbonTable.isHivePartitionTable) { + if (partitionSpecs.isDefined) { + partitionSpecs.get.foreach { partitionSpec => + cleanStaleCompactionDataFiles( + partitionSpec.getLocation.toString, segmentId, factTimestamp) + } + } + } else { + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + cleanStaleCompactionDataFiles( + segmentPath, segmentId, factTimestamp) + } + } + } + + private def cleanStaleCompactionDataFiles( + folderPath: String, + segmentId: String, + factTimestamp: Long): Unit = { + if (FileFactory.isFileExist(folderPath)) { + val namePart = CarbonCommonConstants.HYPHEN + segmentId + + CarbonCommonConstants.HYPHEN + factTimestamp + val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() { Review comment: accepted ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277508 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) + // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs) + } finally { + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = { + if (isForceDelete) { + // empty the trash folder + TrashUtil.emptyTrash(carbonTable.getTablePath) + } else { + // clear trash based on timestamp + TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath) + } + } + + /** + * move stale segment to trash folder, but not include compaction segment + */ + private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = { + if (carbonTable.isHivePartitionTable) { + CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable) + } else { + CleanFilesUtil.cleanStaleSegments(carbonTable) + } + } + + private def cleanExpiredSegments( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = { + val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, + isForceDelete, partitionSpecs, cleanStaleInProgress, true) + if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) { + SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete) + } + } + + /** + * clean the stale compact segment immediately after compaction failure + */ + def cleanStaleCompactionSegment( + carbonTable: CarbonTable, + mergedLoadName: String, + factTimestamp: Long, + partitionSpecs: Option[Seq[PartitionSpec]]): Unit = { + val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath) + val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath) + if (details == null || details.isEmpty) { + return + } + val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName)) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277613 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala ########## @@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { event match { case cleanFilesPostEvent: CleanFilesPostEvent => LOGGER.info("Clean files post event listener called") - val carbonTable = cleanFilesPostEvent.carbonTable - val indexTables = CarbonIndexUtil - .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession) - val isForceDelete = cleanFilesPostEvent.ifForceDelete - val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress - indexTables.foreach { indexTable => - val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( - Seq.empty[Expression], - cleanFilesPostEvent.sparkSession, - indexTable) - SegmentStatusManager.deleteLoadsAndUpdateMetadata( - indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean, - true) - CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true) - cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) - } + cleanFilesForIndex( + cleanFilesPostEvent.sparkSession, + cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean, + cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean) + + cleanFilesForMv( + cleanFilesPostEvent.sparkSession, + cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.options) + } + } + + private def cleanFilesForIndex( + sparkSession: SparkSession, + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean + ): Unit = { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277683 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala ########## @@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { event match { case cleanFilesPostEvent: CleanFilesPostEvent => LOGGER.info("Clean files post event listener called") - val carbonTable = cleanFilesPostEvent.carbonTable - val indexTables = CarbonIndexUtil - .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession) - val isForceDelete = cleanFilesPostEvent.ifForceDelete - val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress - indexTables.foreach { indexTable => - val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( - Seq.empty[Expression], - cleanFilesPostEvent.sparkSession, - indexTable) - SegmentStatusManager.deleteLoadsAndUpdateMetadata( - indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean, - true) - CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true) - cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) - } + cleanFilesForIndex( + cleanFilesPostEvent.sparkSession, + cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean, + cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean) + + cleanFilesForMv( + cleanFilesPostEvent.sparkSession, + cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.options) + } + } + + private def cleanFilesForIndex( + sparkSession: SparkSession, + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean + ): Unit = { + val indexTables = CarbonIndexUtil + .getIndexCarbonTables(carbonTable, sparkSession) + indexTables.foreach { indexTable => + val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( + Seq.empty[Expression], + sparkSession, + indexTable) + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress, + true) + cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) + } + } + + private def cleanFilesForMv( + sparkSession: SparkSession, + carbonTable: CarbonTable, + options: Map[String, String] + ): Unit = { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537278197 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala ########## @@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { event match { case cleanFilesPostEvent: CleanFilesPostEvent => LOGGER.info("Clean files post event listener called") - val carbonTable = cleanFilesPostEvent.carbonTable - val indexTables = CarbonIndexUtil - .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession) - val isForceDelete = cleanFilesPostEvent.ifForceDelete - val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress - indexTables.foreach { indexTable => - val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( - Seq.empty[Expression], - cleanFilesPostEvent.sparkSession, - indexTable) - SegmentStatusManager.deleteLoadsAndUpdateMetadata( - indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean, - true) - CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true) - cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) - } + cleanFilesForIndex( + cleanFilesPostEvent.sparkSession, + cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean, + cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean) + + cleanFilesForMv( + cleanFilesPostEvent.sparkSession, + cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.options) + } + } + + private def cleanFilesForIndex( + sparkSession: SparkSession, + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean + ): Unit = { + val indexTables = CarbonIndexUtil + .getIndexCarbonTables(carbonTable, sparkSession) + indexTables.foreach { indexTable => + val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( + Seq.empty[Expression], + sparkSession, + indexTable) + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress, + true) + cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) + } + } + + private def cleanFilesForMv( + sparkSession: SparkSession, + carbonTable: CarbonTable, + options: Map[String, String] + ): Unit = { + val viewSchemas = MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable) + if (!viewSchemas.isEmpty) { + viewSchemas.asScala.map { schema => Review comment: fixed, the following command is using the wrong table, changed it to use this variable: schema ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537278313 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out + */ + private static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) { + int retentionDays = CarbonProperties.getInstance().getTrashFolderRetentionTime(); + long retentionMilliSeconds = TimeUnit.DAYS.toMillis(1) * retentionDays; + return CarbonUpdateUtil.readCurrentTime() - fileTimestamp > retentionMilliSeconds; + } + + /** + * whether trash data outside of .Trash folder is time out */ - public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) { - // record current time. - long currentTime = CarbonUpdateUtil.readCurrentTime(); - long retentionMilliSeconds = (long)Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer.toString( - CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * TimeUnit.DAYS - .toMillis(1); - long difference = currentTime - fileTimestamp; - return difference > retentionMilliSeconds; + public static boolean isTrashDataTimeout(long fileTimestamp) { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281123 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -263,15 +249,7 @@ object CarbonDataRDDFactory { throw new Exception("Exception in compaction " + exception.getMessage) } } finally { - executor.shutdownNow() - try { - compactor.deletePartialLoadsInCompaction() Review comment: already add code to handle compaction exception instead of this function. this function will list the whole table to clean partial loads. new function only focus on current load. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281516 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -577,38 +556,46 @@ object CarbonDataRDDFactory { LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } - - // code to handle Pre-Priming cache for loading - - if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { - DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), - operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) - } - try { - // compaction handling - if (carbonTable.isHivePartitionTable) { - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - } - val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) - writtenSegment - } catch { - case e: Exception => - LOGGER.error( - "Auto-Compaction has failed. Ignoring this exception because the" + - " load is passed.", e) - writtenSegment - } + isLoadingCommitted = true + writtenSegment } } finally { // Release the segment lock, once table status is finally updated segmentLock.unlock() + if (isLoadingCommitted) { + triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext) + } + } + } + + private def triggerEventsAfterLoading( + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // code to handle Pre-Priming cache for loading + if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { + DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), Review comment: ok, we can do it in another pr. here need it also. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537282367 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) + // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs) Review comment: ```suggestion checkAndCleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281516 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -577,38 +556,46 @@ object CarbonDataRDDFactory { LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } - - // code to handle Pre-Priming cache for loading - - if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { - DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), - operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) - } - try { - // compaction handling - if (carbonTable.isHivePartitionTable) { - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - } - val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) - writtenSegment - } catch { - case e: Exception => - LOGGER.error( - "Auto-Compaction has failed. Ignoring this exception because the" + - " load is passed.", e) - writtenSegment - } + isLoadingCommitted = true + writtenSegment } } finally { // Release the segment lock, once table status is finally updated segmentLock.unlock() + if (isLoadingCommitted) { + triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext) + } + } + } + + private def triggerEventsAfterLoading( + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // code to handle Pre-Priming cache for loading + if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { + DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), Review comment: ok, we can do it in another pr, compaction maybe not compact this segment. here need it also. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |