Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158520831 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -315,88 +314,100 @@ object CarbonDataRDDFactory { val isSortTable = carbonTable.getNumberOfSortColumns > 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK) + try { - if (updateModel.isDefined) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 + if (segmentLock.lockWithRetries()) { + if (updateModel.isDefined) { + res = loadDataFrameForUpdate( + sqlContext, + dataFrame, + carbonLoadModel, + updateModel, + carbonTable) + res.foreach { resultOfSeg => + resultOfSeg.foreach { resultOfBlock => + if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { + updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + updateModel.get.executorErrors.errorMsg = "Failure in the Executor." + } else { + updateModel.get.executorErrors = resultOfBlock._2._2 + } + } else if (resultOfBlock._2._1.getSegmentStatus == + SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } } - } - } else { - status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { - loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) - } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, - dataFrame, carbonLoadModel, hadoopConf) - } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, carbonLoadModel) } else { - loadDataFile(sqlContext, carbonLoadModel, hadoopConf) - } - CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) - val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] - if (status.nonEmpty) { - status.foreach { eachLoadStatus => - val state = newStatusMap.get(eachLoadStatus._1) - state match { - case Some(SegmentStatus.LOAD_FAILURE) => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) - if eachLoadStatus._2._1.getSegmentStatus == - SegmentStatus.SUCCESS => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case _ => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - } + status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { + loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) + } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, + dataFrame, carbonLoadModel, hadoopConf) + } else if (dataFrame.isDefined) { + loadDataFrame(sqlContext, dataFrame, carbonLoadModel) + } else { + loadDataFile(sqlContext, carbonLoadModel, hadoopConf) } - - newStatusMap.foreach { - case (key, value) => - if (value == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && - loadStatus!= SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) + val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] + if (status.nonEmpty) { + status.foreach { eachLoadStatus => + val state = newStatusMap.get(eachLoadStatus._1) + state match { + case Some(SegmentStatus.LOAD_FAILURE) => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) + if eachLoadStatus._2._1.getSegmentStatus == + SegmentStatus.SUCCESS => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case _ => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) } - } - } else { - // if no value is there in data load, make load status Success - // and data load flow executes - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - loadStatus = SegmentStatus.SUCCESS + } + + newStatusMap.foreach { + case (key, value) => + if (value == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && + loadStatus != SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + } } } else { - loadStatus = SegmentStatus.LOAD_FAILURE + // if no value is there in data load, make load status Success + // and data load flow executes + if (dataFrame.isDefined && updateModel.isEmpty) { + val rdd = dataFrame.get.rdd + if (rdd.partitions == null || rdd.partitions.length == 0) { + LOGGER.warn("DataLoading finished. No data was loaded.") + loadStatus = SegmentStatus.SUCCESS + } + } else { + loadStatus = SegmentStatus.LOAD_FAILURE + } } - } - if (loadStatus != SegmentStatus.LOAD_FAILURE && - partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = partitionStatus + if (loadStatus != SegmentStatus.LOAD_FAILURE && + partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = partitionStatus + } } + } else { + LOGGER.audit("Not able to acquire the segment lock for table " + --- End diff -- how can you update on load in progress segment --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158524394 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java --- @@ -124,27 +148,50 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, return false; } + private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, + String metadataPath) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { + return oneLoad; + } + } + return null; + } + public static boolean deleteLoadFoldersFromFileSystem( AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, - LoadMetadataDetails[] details) { + LoadMetadataDetails[] details, String metadataPath) { boolean isDeleted = false; if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { - String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + try { + if (segmentLock.lockWithRetries()) { --- End diff -- this can be solved by PR 1708, there we have added one more lockWithRetries with retry and timeout arguments. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1702 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2527/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1061/ --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158690647 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -315,88 +314,100 @@ object CarbonDataRDDFactory { val isSortTable = carbonTable.getNumberOfSortColumns > 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK) + try { - if (updateModel.isDefined) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 + if (segmentLock.lockWithRetries()) { + if (updateModel.isDefined) { + res = loadDataFrameForUpdate( + sqlContext, + dataFrame, + carbonLoadModel, + updateModel, + carbonTable) + res.foreach { resultOfSeg => + resultOfSeg.foreach { resultOfBlock => + if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { + updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + updateModel.get.executorErrors.errorMsg = "Failure in the Executor." + } else { + updateModel.get.executorErrors = resultOfBlock._2._2 + } + } else if (resultOfBlock._2._1.getSegmentStatus == + SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } } - } - } else { - status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { - loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) - } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, - dataFrame, carbonLoadModel, hadoopConf) - } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, carbonLoadModel) } else { - loadDataFile(sqlContext, carbonLoadModel, hadoopConf) - } - CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) - val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] - if (status.nonEmpty) { - status.foreach { eachLoadStatus => - val state = newStatusMap.get(eachLoadStatus._1) - state match { - case Some(SegmentStatus.LOAD_FAILURE) => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) - if eachLoadStatus._2._1.getSegmentStatus == - SegmentStatus.SUCCESS => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case _ => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - } + status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { + loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) + } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, + dataFrame, carbonLoadModel, hadoopConf) + } else if (dataFrame.isDefined) { + loadDataFrame(sqlContext, dataFrame, carbonLoadModel) + } else { + loadDataFile(sqlContext, carbonLoadModel, hadoopConf) } - - newStatusMap.foreach { - case (key, value) => - if (value == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && - loadStatus!= SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) + val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] + if (status.nonEmpty) { + status.foreach { eachLoadStatus => + val state = newStatusMap.get(eachLoadStatus._1) + state match { + case Some(SegmentStatus.LOAD_FAILURE) => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) + if eachLoadStatus._2._1.getSegmentStatus == + SegmentStatus.SUCCESS => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case _ => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) } - } - } else { - // if no value is there in data load, make load status Success - // and data load flow executes - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - loadStatus = SegmentStatus.SUCCESS + } + + newStatusMap.foreach { + case (key, value) => + if (value == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && + loadStatus != SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + } } } else { - loadStatus = SegmentStatus.LOAD_FAILURE + // if no value is there in data load, make load status Success + // and data load flow executes + if (dataFrame.isDefined && updateModel.isEmpty) { + val rdd = dataFrame.get.rdd + if (rdd.partitions == null || rdd.partitions.length == 0) { + LOGGER.warn("DataLoading finished. No data was loaded.") + loadStatus = SegmentStatus.SUCCESS + } + } else { + loadStatus = SegmentStatus.LOAD_FAILURE + } } - } - if (loadStatus != SegmentStatus.LOAD_FAILURE && - partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = partitionStatus + if (loadStatus != SegmentStatus.LOAD_FAILURE && + partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = partitionStatus + } } + } else { + LOGGER.audit("Not able to acquire the segment lock for table " + --- End diff -- removed else part --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:
https://github.com/apache/carbondata/pull/1702 retest this please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1702 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2557/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1116/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1121/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2335/ --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:
https://github.com/apache/carbondata/pull/1702 retest this please --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani closed the pull request at:
https://github.com/apache/carbondata/pull/1702 --- |
In reply to this post by qiuchenjian-2
GitHub user dhatchayani reopened a pull request:
https://github.com/apache/carbondata/pull/1702 [CARBONDATA-1896] Clean files operation improvement **Problem:** When bringing up the session, clean operation is handled in a way to mark all the INSERT_OVERWRITE_IN_PROGRESS or INSERT_IN_PROGRESS segments to MARKED_FOR_DELETE in tablestatus file. This clean operation is not considering the other parallel sessions. If any other session's data load is IN_PROGRESS at the time of bringing up one session, then the executing load also will be changed to MARKED_FOR_DELETE irrespective of the actual load status. Handling stale segments cleaning while session bring up also increases the time of bringing up a session. **Solution:** SEGMENT_LOCK should be taken on the new segment while loading. While cleaning segments tablestatus file and SEGMENT_LOCK should be considered. Cleaning stale files while bringing up the session should be removed and this can be either manually done on the needed tables through already existing CLEAN FILES DDL or the next load on the table will clean the same. - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [x] Testing done Manual Testing - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhatchayani/incubator-carbondata clean_files Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1702.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1702 ---- commit 4573f5fbcc7d0414323513e8746f9050f9eb1e78 Author: dhatchayani <dhatcha.official@...> Date: 2017-12-20T17:05:31Z [CARBONDATA-1896] Clean files operation improvement ---- --- |
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:
https://github.com/apache/carbondata/pull/1702 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2344/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2346/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1128/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1131/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1702 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2567/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1702 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2357/ --- |
Free forum by Nabble | Edit this page |