Github user dhatchayani commented on the issue:
https://github.com/apache/carbondata/pull/1702 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2257/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1034/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1702 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2512/ --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:
https://github.com/apache/carbondata/pull/1702 retest this please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1702 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2513/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2261/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158474524 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -178,6 +178,7 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata + DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) --- End diff -- Please add the purpose why we have to move deletion call from CarbonDataRDDFactory.scala to here --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1039/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158477319 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java --- @@ -133,16 +138,23 @@ public static boolean deleteLoadFoldersFromFileSystem( for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, --- End diff -- 1) Please add a log for lock acquire status 2) Please check the latest status if the current in progress is just completed. If so, we will accidentally delete completed load. 3) Can we use lock with out reties as this work is only for cleanup and retries may make clean files command very slow due to new lock. --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158480429 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -418,8 +429,11 @@ object CarbonDataRDDFactory { errorMessage = errorMessage + ": " + executorMessage } } + segmentLock.unlock() --- End diff -- Please remove this. unlock in finally is enough --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158480479 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java --- @@ -133,16 +138,23 @@ public static boolean deleteLoadFoldersFromFileSystem( for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + if (segmentLock.lockWithRetries()) { + boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); + if (deletionStatus) { + isDeleted = true; + oneLoad.setVisibility("false"); + LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + } + segmentLock.unlock(); --- End diff -- Please move lock to finally --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158490560 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java --- @@ -133,16 +138,23 @@ public static boolean deleteLoadFoldersFromFileSystem( for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, --- End diff -- logic is divided --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:
https://github.com/apache/carbondata/pull/1702 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2271/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158509584 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -315,88 +314,100 @@ object CarbonDataRDDFactory { val isSortTable = carbonTable.getNumberOfSortColumns > 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK) + try { - if (updateModel.isDefined) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 + if (segmentLock.lockWithRetries()) { + if (updateModel.isDefined) { + res = loadDataFrameForUpdate( + sqlContext, + dataFrame, + carbonLoadModel, + updateModel, + carbonTable) + res.foreach { resultOfSeg => + resultOfSeg.foreach { resultOfBlock => + if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { + updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + updateModel.get.executorErrors.errorMsg = "Failure in the Executor." + } else { + updateModel.get.executorErrors = resultOfBlock._2._2 + } + } else if (resultOfBlock._2._1.getSegmentStatus == + SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } } - } - } else { - status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { - loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) - } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, - dataFrame, carbonLoadModel, hadoopConf) - } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, carbonLoadModel) } else { - loadDataFile(sqlContext, carbonLoadModel, hadoopConf) - } - CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) - val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] - if (status.nonEmpty) { - status.foreach { eachLoadStatus => - val state = newStatusMap.get(eachLoadStatus._1) - state match { - case Some(SegmentStatus.LOAD_FAILURE) => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) - if eachLoadStatus._2._1.getSegmentStatus == - SegmentStatus.SUCCESS => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case _ => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - } + status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { + loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) + } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, + dataFrame, carbonLoadModel, hadoopConf) + } else if (dataFrame.isDefined) { + loadDataFrame(sqlContext, dataFrame, carbonLoadModel) + } else { + loadDataFile(sqlContext, carbonLoadModel, hadoopConf) } - - newStatusMap.foreach { - case (key, value) => - if (value == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && - loadStatus!= SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) + val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] + if (status.nonEmpty) { + status.foreach { eachLoadStatus => + val state = newStatusMap.get(eachLoadStatus._1) + state match { + case Some(SegmentStatus.LOAD_FAILURE) => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) + if eachLoadStatus._2._1.getSegmentStatus == + SegmentStatus.SUCCESS => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case _ => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) } - } - } else { - // if no value is there in data load, make load status Success - // and data load flow executes - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - loadStatus = SegmentStatus.SUCCESS + } + + newStatusMap.foreach { + case (key, value) => + if (value == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && + loadStatus != SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + } } } else { - loadStatus = SegmentStatus.LOAD_FAILURE + // if no value is there in data load, make load status Success + // and data load flow executes + if (dataFrame.isDefined && updateModel.isEmpty) { + val rdd = dataFrame.get.rdd + if (rdd.partitions == null || rdd.partitions.length == 0) { + LOGGER.warn("DataLoading finished. No data was loaded.") + loadStatus = SegmentStatus.SUCCESS + } + } else { + loadStatus = SegmentStatus.LOAD_FAILURE + } } - } - if (loadStatus != SegmentStatus.LOAD_FAILURE && - partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = partitionStatus + if (loadStatus != SegmentStatus.LOAD_FAILURE && + partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = partitionStatus + } } + } else { + LOGGER.audit("Not able to acquire the segment lock for table " + --- End diff -- This case ever come? --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158510211 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -315,88 +314,100 @@ object CarbonDataRDDFactory { val isSortTable = carbonTable.getNumberOfSortColumns > 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK) + try { - if (updateModel.isDefined) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 + if (segmentLock.lockWithRetries()) { + if (updateModel.isDefined) { + res = loadDataFrameForUpdate( + sqlContext, + dataFrame, + carbonLoadModel, + updateModel, + carbonTable) + res.foreach { resultOfSeg => + resultOfSeg.foreach { resultOfBlock => + if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { + updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + updateModel.get.executorErrors.errorMsg = "Failure in the Executor." + } else { + updateModel.get.executorErrors = resultOfBlock._2._2 + } + } else if (resultOfBlock._2._1.getSegmentStatus == + SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } } - } - } else { - status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { - loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) - } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, - dataFrame, carbonLoadModel, hadoopConf) - } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, carbonLoadModel) } else { - loadDataFile(sqlContext, carbonLoadModel, hadoopConf) - } - CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) - val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] - if (status.nonEmpty) { - status.foreach { eachLoadStatus => - val state = newStatusMap.get(eachLoadStatus._1) - state match { - case Some(SegmentStatus.LOAD_FAILURE) => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) - if eachLoadStatus._2._1.getSegmentStatus == - SegmentStatus.SUCCESS => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case _ => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - } + status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { + loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) + } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, + dataFrame, carbonLoadModel, hadoopConf) + } else if (dataFrame.isDefined) { + loadDataFrame(sqlContext, dataFrame, carbonLoadModel) + } else { + loadDataFile(sqlContext, carbonLoadModel, hadoopConf) } - - newStatusMap.foreach { - case (key, value) => - if (value == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && - loadStatus!= SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) + val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] + if (status.nonEmpty) { + status.foreach { eachLoadStatus => + val state = newStatusMap.get(eachLoadStatus._1) + state match { + case Some(SegmentStatus.LOAD_FAILURE) => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) + if eachLoadStatus._2._1.getSegmentStatus == + SegmentStatus.SUCCESS => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case _ => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) } - } - } else { - // if no value is there in data load, make load status Success - // and data load flow executes - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - loadStatus = SegmentStatus.SUCCESS + } + + newStatusMap.foreach { + case (key, value) => + if (value == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && + loadStatus != SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + } } } else { - loadStatus = SegmentStatus.LOAD_FAILURE + // if no value is there in data load, make load status Success + // and data load flow executes + if (dataFrame.isDefined && updateModel.isEmpty) { + val rdd = dataFrame.get.rdd + if (rdd.partitions == null || rdd.partitions.length == 0) { + LOGGER.warn("DataLoading finished. No data was loaded.") + loadStatus = SegmentStatus.SUCCESS + } + } else { + loadStatus = SegmentStatus.LOAD_FAILURE + } } - } - if (loadStatus != SegmentStatus.LOAD_FAILURE && - partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = partitionStatus + if (loadStatus != SegmentStatus.LOAD_FAILURE && + partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = partitionStatus + } } + } else { + LOGGER.audit("Not able to acquire the segment lock for table " + --- End diff -- no --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158519731 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java --- @@ -124,27 +148,50 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, return false; } + private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, + String metadataPath) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { + return oneLoad; + } + } + return null; + } + public static boolean deleteLoadFoldersFromFileSystem( AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, - LoadMetadataDetails[] details) { + LoadMetadataDetails[] details, String metadataPath) { boolean isDeleted = false; if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { - String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + try { + if (segmentLock.lockWithRetries()) { --- End diff -- It will add up time for each time dataload happens while trying to take lock if parallel load happens. Better add another method `lockWithRetries` which should take very less time while acquiring lock --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158520072 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java --- @@ -124,27 +148,50 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, return false; } + private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, + String metadataPath) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { + return oneLoad; + } + } + return null; + } + public static boolean deleteLoadFoldersFromFileSystem( AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, - LoadMetadataDetails[] details) { + LoadMetadataDetails[] details, String metadataPath) { boolean isDeleted = false; if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { - String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + try { + if (segmentLock.lockWithRetries()) { --- End diff -- Better acquire locks for in-progress status segments not for others --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1050/ --- |
Free forum by Nabble | Edit this page |