ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537283710 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) + // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs) + } finally { + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = { + if (isForceDelete) { + // empty the trash folder + TrashUtil.emptyTrash(carbonTable.getTablePath) + } else { + // clear trash based on timestamp + TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath) + } + } + + /** + * move stale segment to trash folder, but not include compaction segment Review comment: ```suggestion * move stale segment to trash folder, but not include stale compaction (x.y) segment ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537288377 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -577,38 +556,46 @@ object CarbonDataRDDFactory { LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } - - // code to handle Pre-Priming cache for loading - - if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { - DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), - operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) - } - try { - // compaction handling - if (carbonTable.isHivePartitionTable) { - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - } - val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) - writtenSegment - } catch { - case e: Exception => - LOGGER.error( - "Auto-Compaction has failed. Ignoring this exception because the" + - " load is passed.", e) - writtenSegment - } + isLoadingCommitted = true + writtenSegment } } finally { // Release the segment lock, once table status is finally updated segmentLock.unlock() + if (isLoadingCommitted) { + triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext) + } + } + } + + private def triggerEventsAfterLoading( + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // code to handle Pre-Priming cache for loading + if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { + DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), Review comment: calling two times will increase time, better to have a logic to find out whether compacted or not and based on that send the segments to pre-prime only once, its better. Also in `DistributedRDDUtils.scala`, line number 376, new `SegmentUpdateStatusManager `is created which is not used, its simply reading the table status file and update status, please check if it can be removed. Just another input to optimization. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537289240 ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) { } - /** - * Handling of the clean up of old carbondata files, index files , delete delta, - * update status files. - * @param table clean up will be handled on this table. - * @param forceDelete if true then max query execution timeout will not be considered. - */ - public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException { - - SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); - - LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(table.getMetadataPath()); - - SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table); - SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails(); - // hold all the segments updated so that wen can check the delta files in them, ne need to - // check the others. - Set<String> updatedSegments = new HashSet<>(); - for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) { - updatedSegments.add(updateDetails.getSegmentName()); - } - - String validUpdateStatusFile = ""; - - boolean isAbortedFile = true; - - boolean isInvalidFile = false; - - // take the update status file name from 0th segment. - validUpdateStatusFile = ssm.getUpdateStatusFileName(details); - // scan through each segment. - for (LoadMetadataDetails segment : details) { - // if this segment is valid then only we will go for delta file deletion. - // if the segment is mark for delete or compacted then any way it will get deleted. - if (segment.getSegmentStatus() == SegmentStatus.SUCCESS - || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - // when there is no update operations done on table, then no need to go ahead. So - // just check the update delta start timestamp and proceed if not empty - if (!segment.getUpdateDeltaStartTimestamp().isEmpty() - || updatedSegments.contains(segment.getLoadName())) { - // take the list of files from this segment. - String segmentPath = CarbonTablePath.getSegmentPath( - table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName()); - CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath); - CarbonFile[] allSegmentFiles = segDir.listFiles(); - - // now handle all the delete delta files which needs to be deleted. - // there are 2 cases here . - // 1. if the block is marked as compacted then the corresponding delta files - // can be deleted if query exec timeout is done. - // 2. if the block is in success state then also there can be delete - // delta compaction happened and old files can be deleted. - - SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata(); - for (SegmentUpdateDetails block : updateDetails) { - CarbonFile[] completeListOfDeleteDeltaFiles; - CarbonFile[] invalidDeleteDeltaFiles; - - if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) { - continue; - } - - // aborted scenario. - invalidDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(block, false, - allSegmentFiles, isAbortedFile); - for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { - boolean doForceDelete = true; - compareTimestampsAndDelete(invalidFile, doForceDelete, false); - } - - // case 1 - if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) { - completeListOfDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(block, true, - allSegmentFiles, isInvalidFile); - for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, false); - } - - } else { - invalidDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(block, false, - allSegmentFiles, isInvalidFile); - for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, false); - } - } - } - } - // handle cleanup of merge index files and data files after small files merge happened for - // SI table - cleanUpDataFilesAfterSmallFilesMergeForSI(table, segment); - } - } - - // delete the update table status files which are old. - if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) { - - final String updateStatusTimestamp = validUpdateStatusFile - .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1); - - String tablePath = table.getAbsoluteTableIdentifier().getTablePath(); - CarbonFile metaFolder = FileFactory.getCarbonFile( - CarbonTablePath.getMetadataPath(tablePath)); - - CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) { - // CHECK if this is valid or not. - // we only send invalid ones to delete. - return !file.getName().endsWith(updateStatusTimestamp); - } - return false; - } - }); - - for (CarbonFile invalidFile : invalidUpdateStatusFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, true); - } - } - } - - /** - * this is the clean up added specifically for SI table, because after we merge the data files - * inside the secondary index table, we need to delete the stale carbondata files. - * refer org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD - */ - private static void cleanUpDataFilesAfterSmallFilesMergeForSI(CarbonTable table, Review comment: done [Should clean stale data in success segments](https://issues.apache.org/jira/browse/CARBONDATA-4074) cleaning stale data in success segments include the following parts. 1. clean stale delete delta (when force is true) 2. clean stale small files for index table 3. clean stale data files for loading/compaction ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537293803 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) Review comment: I saw that from moveStaleSegmentsToTrash and cleanExpiredSegments, we call `SegmentStatusManager.readLoadMetadata`, is it possible to combine two methods so that we read table status once and then if it is stale we move to trash, it is expired we delete it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537293803 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) Review comment: I saw that from moveStaleSegmentsToTrash and cleanExpiredSegments, we call `SegmentStatusManager.readLoadMetadata` multiple times, is it possible to combine two methods so that we read table status once and then if it is stale we move to trash, it is expired we delete it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537294489 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/events/package.scala ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata + +package object events { + def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = { Review comment: done [Should refactor carbon to use withEvents instead of fireEvent](https://issues.apache.org/jira/browse/CARBONDATA-4075) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537294489 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/events/package.scala ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata + +package object events { + def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = { Review comment: done [Should refactor to use withEvents instead of fireEvent](https://issues.apache.org/jira/browse/CARBONDATA-4075) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739745639 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3341/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739745898 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5076/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537302169 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. check and clean .Trash folder + * 2. move stale segments without metadata into .Trash + * 3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param isForceDelete clean the MFD/Compacted segments immediately and empty trash folder + * @param cleanStaleInProgress clean the In Progress segments based on retention time, + * it will clean immediately when force is true + */ + def cleanGarbageData( + carbonTable: CarbonTable, + isForceDelete: Boolean, + cleanStaleInProgress: Boolean, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false + if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) { + LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" + + " recovered. It is disabled by default, to enable clean files with force option," + + " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true") + throw new RuntimeException("Clean files with force operation not permitted by default") + } + var carbonCleanFilesLock: ICarbonLock = null + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: check and clean trash folder + checkAndCleanTrashFolder(carbonTable, isForceDelete) + // step 2: move stale segments which are not exists in metadata into .Trash + moveStaleSegmentsToTrash(carbonTable) Review comment: for method moveStaleSegmentsToTrash, it need readLoadMetadata after listFiles. the optinal solution is that moveStaleSegmentsToTrash method return a LoadMetadata array. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739749809 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3318/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537305840 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -577,38 +556,46 @@ object CarbonDataRDDFactory { LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } - - // code to handle Pre-Priming cache for loading - - if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { - DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), - operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) - } - try { - // compaction handling - if (carbonTable.isHivePartitionTable) { - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - } - val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) - writtenSegment - } catch { - case e: Exception => - LOGGER.error( - "Auto-Compaction has failed. Ignoring this exception because the" + - " load is passed.", e) - writtenSegment - } + isLoadingCommitted = true + writtenSegment } } finally { // Release the segment lock, once table status is finally updated segmentLock.unlock() + if (isLoadingCommitted) { + triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext) + } + } + } + + private def triggerEventsAfterLoading( + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // code to handle Pre-Priming cache for loading + if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { + DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), Review comment: ok, can you raise another pr to fix it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537305840 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -577,38 +556,46 @@ object CarbonDataRDDFactory { LOGGER.info("Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } - - // code to handle Pre-Priming cache for loading - - if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { - DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), - operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) - } - try { - // compaction handling - if (carbonTable.isHivePartitionTable) { - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - } - val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) - writtenSegment - } catch { - case e: Exception => - LOGGER.error( - "Auto-Compaction has failed. Ignoring this exception because the" + - " load is passed.", e) - writtenSegment - } + isLoadingCommitted = true + writtenSegment } } finally { // Release the segment lock, once table status is finally updated segmentLock.unlock() + if (isLoadingCommitted) { + triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext) + } + } + } + + private def triggerEventsAfterLoading( + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // code to handle Pre-Priming cache for loading + if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { + DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), Review comment: ok, can you raise another pr to fix it? this pr only focus clean files ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739772818 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537330003 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala ########## @@ -48,30 +50,59 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { event match { case cleanFilesPostEvent: CleanFilesPostEvent => LOGGER.info("Clean files post event listener called") - val carbonTable = cleanFilesPostEvent.carbonTable - val indexTables = CarbonIndexUtil - .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession) - val isForceDelete = cleanFilesPostEvent.ifForceDelete - val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress - indexTables.foreach { indexTable => - val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( - Seq.empty[Expression], - cleanFilesPostEvent.sparkSession, - indexTable) - SegmentStatusManager.deleteLoadsAndUpdateMetadata( - indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean, - true) - CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true) - cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) - } + cleanFilesForIndex( Review comment: would it be better if we call cleanFilesCommand for each index tables, like how MV is handled down ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739778382 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739780017 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5101/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739780939 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5077/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739781549 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3320/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4044: URL: https://github.com/apache/carbondata/pull/4044#discussion_r537357883 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala ########## @@ -48,30 +50,59 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { event match { case cleanFilesPostEvent: CleanFilesPostEvent => LOGGER.info("Clean files post event listener called") - val carbonTable = cleanFilesPostEvent.carbonTable - val indexTables = CarbonIndexUtil - .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession) - val isForceDelete = cleanFilesPostEvent.ifForceDelete - val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress - indexTables.foreach { indexTable => - val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions( - Seq.empty[Expression], - cleanFilesPostEvent.sparkSession, - indexTable) - SegmentStatusManager.deleteLoadsAndUpdateMetadata( - indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean, - true) - CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true) - cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) - } + cleanFilesForIndex( Review comment: ok, I will refactor in new pr. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |