CarbonDataQA1 commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-719410167 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2976/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#discussion_r514946130 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -340,7 +340,8 @@ private[sql] case class CarbonProjectForUpdateCommand( case _ => sys.error("") } - val updateTableModel = UpdateTableModel(true, currentTime, executorErrors, deletedSegments) + val updateTableModel = UpdateTableModel(true, currentTime, executorErrors, deletedSegments, + !carbonRelation.carbonTable.isHivePartitionTable) Review comment: done, please have a review ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#discussion_r514948928 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala ########## @@ -103,7 +103,7 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll { sql("update carbon set(a)=(10) where a=1").collect() checkAnswer(sql("select count(*) from carbon where a=10"), Seq(Row(3))) showCache = sql("show metacache on table carbon").collect() - assert(showCache(0).get(2).toString.equalsIgnoreCase("6/8 index files cached")) + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/6 index files cached")) Review comment: The reason of why there is only 1 index files cached: For now. the HorizontalCompaction will trigger IndexStoreManager.getInstance().clearInvalidSegments, which will clean all index of valid segments, other guys is handling this performance issue ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-719473427 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4737/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-719473886 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2979/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat edited a comment on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-720258234 @shenjiayu17 & @marchpure @QiangCai : **Have you analyzed which exact change is responsible for performance improvement?** I don't think it is because of listFiles changes as it will make difference only if stale files are present. Probably writing as a new segment. But still, I don't get why this much difference, I think you have enabled horizontal compaction, now there is no horizontal compaction. You cannot call it as improvement as the query will slow down due to many segments and actual compaction need to run. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#discussion_r515819017 ########## File path: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ########## @@ -26,7 +26,6 @@ public enum CompactionType { MINOR, MAJOR, - IUD_UPDDEL_DELTA, Review comment: I have modified code according to your suggestion ########## File path: docs/configuration-parameters.md ########## @@ -104,8 +104,7 @@ This section provides the details of all the configurations required for the Car | carbon.number.of.cores.while.compacting | 2 | Number of cores to be used while compacting data. This also determines the number of threads to be used to read carbondata files in parallel. | | carbon.compaction.level.threshold | 4, 3 | Each CarbonData load will create one segment, if every load is small in size it will generate many small file over a period of time impacting the query performance. This configuration is for minor compaction which decides how many segments to be merged. Configuration is of the form (x,y). Compaction will be triggered for every x segments and form a single level 1 compacted segment. When the number of compacted level 1 segments reach y, compaction will be triggered again to merge them to form a single level 2 segment. For example: If it is set as 2, 3 then minor compaction will be triggered for every 2 segments. 3 is the number of level 1 compacted segments which is further compacted to new segment. **NOTE:** When ***carbon.enable.auto.load.merge*** is **true**, configuring higher values cause overall data loading time to increase as compaction will be triggered after data loading is complete but status is not returned till compaction is c omplete. But compacting more number of segments can increase query performance. Hence optimal values needs to be configured based on the business scenario. Valid values are between 0 to 100. | | carbon.major.compaction.size | 1024 | To improve query performance and all the segments can be merged and compacted to a single segment upto configured size. This Major compaction size can be configured using this parameter. Sum of the segments which is below this threshold will be merged. This value is expressed in MB. | -| carbon.horizontal.compaction.enable | true | CarbonData supports DELETE/UPDATE functionality by creating delta data files for existing carbondata files. These delta files would grow as more number of DELETE/UPDATE operations are performed. Compaction of these delta files are termed as horizontal compaction. This configuration is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delta (DELETE/ UPDATE) files becomes more than specified threshold. **NOTE:** Having many delta files will reduce the query performance as scan has to happen on all these files before the final state of data can be decided. Hence it is advisable to keep horizontal compaction enabled and configure reasonable values to ***carbon.horizontal.UPDATE.compaction.threshold*** and ***carbon.horizontal.DELETE.compaction.threshold*** | -| carbon.horizontal.update.compaction.threshold | 1 | This configuration specifies the threshold limit on number of UPDATE delta files within a segment. In case the number of delta files goes beyond the threshold, the UPDATE delta files within the segment becomes eligible for horizontal compaction and are compacted into single UPDATE delta file. Values range between 1 to 10000. | +| carbon.horizontal.compaction.enable | true | CarbonData supports DELETE/UPDATE functionality by creating delta data files for existing carbondata files. These delta files would grow as more number of DELETE/UPDATE operations are performed. Compaction of these delete delta files are termed as horizontal compaction. This configuration is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delete delta files becomes more than specified threshold. **NOTE:** Having many delta files will reduce the query performance as scan has to happen on all these files before the final state of data can be decided. Hence it is advisable to keep horizontal compaction enabled and configure reasonable values to ***carbon.horizontal.UPDATE.compaction.threshold*** and ***carbon.horizontal.DELETE.compaction.threshold*** | Review comment: yes. it won't be unused anywhere ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -373,9 +374,7 @@ public static void cleanStaleDeltaFiles(CarbonTable table, final String timeStam @Override public boolean accept(CarbonFile file) { String fileName = file.getName(); - return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT) Review comment: I have modified code according to your suggestion ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -808,19 +808,6 @@ private CarbonCommonConstants() { */ public static final int NUMBER_OF_SEGMENT_COMPACTED_PERTIME_UPPER_LIMIT = 10000; - /** - * Number of Update Delta files which is the Threshold for IUD compaction. - * Only accepted Range is 0 - 10000. Outside this range system will pick default value. - */ - @CarbonProperty - public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = Review comment: I have modified code according to your suggestion ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. Review comment: I have modified code according to your suggestion ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. // So moving it to the end and it is outside of locking. CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) Review comment: I will raise another PR and give an optimazation in this week ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -158,8 +159,6 @@ private[sql] case class CarbonProjectForUpdateCommand( "for the update key") } } - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) Review comment: Clean Files Command maybe a better solution. ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -1050,14 +752,15 @@ object CarbonDataRDDFactory { } var done = true // If the updated data should be added as new segment then update the segment information - if (updateModel.isDefined && updateModel.get.loadAsNewSegment) { + if (updateModel.isDefined) { Review comment: I will raise another PR and give an optimazation in this week ########## File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ########## @@ -1243,10 +1256,14 @@ public static void updateTableStatusInCaseOfFailure(String loadName, SegmentStatusManager.readLoadMetadata(metaDataPath); boolean ifTableStatusUpdateRequired = false; for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) { - if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && loadName - .equalsIgnoreCase(loadMetadataDetail.getLoadName())) { + if ((loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS + || loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS Review comment: I have modified code according to your suggestion ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. // So moving it to the end and it is outside of locking. CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) + + // Delete The New Inserted Segment + CarbonLoaderUtil.updateTableStatusInCaseOfFailure( Review comment: I have modified code according to your suggestion ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -342,33 +336,8 @@ object CarbonDataRDDFactory { try { if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) { - if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable, - hadoopConf, - segmentMetaDataAccumulator) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 - } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg - } - } - } + if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) { + // if the rowtoupdated is empty, do nothing Review comment: I have modified code according to your suggestion ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -276,14 +278,13 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { + if (isUpdateStatusFileUpdateRequired && + loadMetadata.getLoadName().equalsIgnoreCase("0")) { + loadMetadata.setUpdateStatusFileName( + CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp)); + } if (isTimestampUpdateRequired) { - // we are storing the link between the 2 status files in the segment 0 only. Review comment: I have modified code according to your suggestion ########## File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ########## @@ -101,8 +101,8 @@ public SegmentIndexFileStore(Configuration configuration) { * @param segmentPath * @throws IOException */ - public void readAllIIndexOfSegment(String segmentPath) throws IOException { - CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration); + public void readAllIIndexOfSegment(String segmentPath, String uuid) throws IOException { Review comment: I have modified code according to your suggestion ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-720330566 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-720258234 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-720927959 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#discussion_r515759986 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -808,19 +808,6 @@ private CarbonCommonConstants() { */ public static final int NUMBER_OF_SEGMENT_COMPACTED_PERTIME_UPPER_LIMIT = 10000; - /** - * Number of Update Delta files which is the Threshold for IUD compaction. - * Only accepted Range is 0 - 10000. Outside this range system will pick default value. - */ - @CarbonProperty - public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = Review comment: please look up `carbon.horizontal.update.compaction.threshold` and `UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION` in code. still many places it is present ########## File path: docs/configuration-parameters.md ########## @@ -104,8 +104,7 @@ This section provides the details of all the configurations required for the Car | carbon.number.of.cores.while.compacting | 2 | Number of cores to be used while compacting data. This also determines the number of threads to be used to read carbondata files in parallel. | | carbon.compaction.level.threshold | 4, 3 | Each CarbonData load will create one segment, if every load is small in size it will generate many small file over a period of time impacting the query performance. This configuration is for minor compaction which decides how many segments to be merged. Configuration is of the form (x,y). Compaction will be triggered for every x segments and form a single level 1 compacted segment. When the number of compacted level 1 segments reach y, compaction will be triggered again to merge them to form a single level 2 segment. For example: If it is set as 2, 3 then minor compaction will be triggered for every 2 segments. 3 is the number of level 1 compacted segments which is further compacted to new segment. **NOTE:** When ***carbon.enable.auto.load.merge*** is **true**, configuring higher values cause overall data loading time to increase as compaction will be triggered after data loading is complete but status is not returned till compaction is c omplete. But compacting more number of segments can increase query performance. Hence optimal values needs to be configured based on the business scenario. Valid values are between 0 to 100. | | carbon.major.compaction.size | 1024 | To improve query performance and all the segments can be merged and compacted to a single segment upto configured size. This Major compaction size can be configured using this parameter. Sum of the segments which is below this threshold will be merged. This value is expressed in MB. | -| carbon.horizontal.compaction.enable | true | CarbonData supports DELETE/UPDATE functionality by creating delta data files for existing carbondata files. These delta files would grow as more number of DELETE/UPDATE operations are performed. Compaction of these delta files are termed as horizontal compaction. This configuration is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delta (DELETE/ UPDATE) files becomes more than specified threshold. **NOTE:** Having many delta files will reduce the query performance as scan has to happen on all these files before the final state of data can be decided. Hence it is advisable to keep horizontal compaction enabled and configure reasonable values to ***carbon.horizontal.UPDATE.compaction.threshold*** and ***carbon.horizontal.DELETE.compaction.threshold*** | -| carbon.horizontal.update.compaction.threshold | 1 | This configuration specifies the threshold limit on number of UPDATE delta files within a segment. In case the number of delta files goes beyond the threshold, the UPDATE delta files within the segment becomes eligible for horizontal compaction and are compacted into single UPDATE delta file. Values range between 1 to 10000. | +| carbon.horizontal.compaction.enable | true | CarbonData supports DELETE/UPDATE functionality by creating delta data files for existing carbondata files. These delta files would grow as more number of DELETE/UPDATE operations are performed. Compaction of these delete delta files are termed as horizontal compaction. This configuration is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delete delta files becomes more than specified threshold. **NOTE:** Having many delta files will reduce the query performance as scan has to happen on all these files before the final state of data can be decided. Hence it is advisable to keep horizontal compaction enabled and configure reasonable values to ***carbon.horizontal.UPDATE.compaction.threshold*** and ***carbon.horizontal.DELETE.compaction.threshold*** | Review comment: Need to remove `carbon.horizontal.UPDATE.compaction.threshold` ? ########## File path: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ########## @@ -26,7 +26,6 @@ public enum CompactionType { MINOR, MAJOR, - IUD_UPDDEL_DELTA, Review comment: Don't remove ENUM, it may give compatibility issues. ########## File path: core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java ########## @@ -86,28 +85,14 @@ public synchronized SegmentMetaDataInfo getTableSegmentMetaDataInfo(String table public synchronized void setBlockMetaDataInfo(String tableName, String segmentId, BlockColumnMetaDataInfo currentBlockColumnMetaInfo) { // check if tableName is present in tableSegmentMetaDataInfoMap - if (!this.tableSegmentMetaDataInfoMap.isEmpty() && null != this.tableSegmentMetaDataInfoMap - .get(tableName) && null != this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) { - // get previous blockColumn metadata information - BlockColumnMetaDataInfo previousBlockColumnMetaInfo = - this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId); - // compare and get updated min and max values - byte[][] updatedMin = BlockIndex.compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(), Review comment: ok ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -373,9 +374,7 @@ public static void cleanStaleDeltaFiles(CarbonTable table, final String timeStam @Override public boolean accept(CarbonFile file) { String fileName = file.getName(); - return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT) Review comment: Also can remove `UPDATE_DELTA_FILE_EXT` and `UPDATE_INDEX_FILE_EXT` from code ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -158,8 +159,6 @@ private[sql] case class CarbonProjectForUpdateCommand( "for the update key") } } - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) Review comment: @QiangCai , @marchpure : In the release notes or upgrade guide, we can suggest to the user that if update files are present, compact (horizontal) before upgrading. Just my opinion. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#issuecomment-720270269 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3999: URL: https://github.com/apache/carbondata/pull/3999#discussion_r515768710 ########## File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ########## @@ -101,8 +101,8 @@ public SegmentIndexFileStore(Configuration configuration) { * @param segmentPath * @throws IOException */ - public void readAllIIndexOfSegment(String segmentPath) throws IOException { - CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration); + public void readAllIIndexOfSegment(String segmentPath, String uuid) throws IOException { Review comment: after it added uuid parameter, how about to change the comments or method name? ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -342,33 +336,8 @@ object CarbonDataRDDFactory { try { if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) { - if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable, - hadoopConf, - segmentMetaDataAccumulator) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 - } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg - } - } - } + if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) { + // if the rowtoupdated is empty, do nothing Review comment: rowtoupdated => rowToUpdated ########## File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ########## @@ -1243,10 +1256,14 @@ public static void updateTableStatusInCaseOfFailure(String loadName, SegmentStatusManager.readLoadMetadata(metaDataPath); boolean ifTableStatusUpdateRequired = false; for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) { - if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && loadName - .equalsIgnoreCase(loadMetadataDetail.getLoadName())) { + if ((loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS + || loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS Review comment: 1. updateTableStatusInCaseOfFailure method should not check SUCCESS/LOAD_PARTIAL_SUCCESS segment. 2. let's keep code simple, better to change the solution. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. // So moving it to the end and it is outside of locking. CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) + + // Delete The New Inserted Segment + CarbonLoaderUtil.updateTableStatusInCaseOfFailure( Review comment: 1. better to get loadName from the insertInto command 2. update and insert should update tablestatus once at the end. if insert success but update not finish or failed, in some time another concurrent query will give a wrong result. it shouldn't break ACID rule. 3. refresh index cache for deleted segment only, it should not contain updated segment. ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -276,14 +278,13 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { + if (isUpdateStatusFileUpdateRequired && + loadMetadata.getLoadName().equalsIgnoreCase("0")) { + loadMetadata.setUpdateStatusFileName( + CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp)); + } if (isTimestampUpdateRequired) { - // we are storing the link between the 2 status files in the segment 0 only. Review comment: keep this comment ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. // So moving it to the end and it is outside of locking. CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) + + // Delete The New Inserted Segment + CarbonLoaderUtil.updateTableStatusInCaseOfFailure( Review comment: 1. better to get loadName from the insertInto command 2. update and insert should update tablestatus once at the end. if insert success but update not finish or failed, in some time another concurrent query will give a wrong result. it shouldn't break ACID rule. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. // So moving it to the end and it is outside of locking. CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) + + // Delete The New Inserted Segment + CarbonLoaderUtil.updateTableStatusInCaseOfFailure( Review comment: better to get loadName from the insertInto command ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -1050,14 +752,15 @@ object CarbonDataRDDFactory { } var done = true // If the updated data should be added as new segment then update the segment information - if (updateModel.isDefined && updateModel.get.loadAsNewSegment) { + if (updateModel.isDefined) { Review comment: line 756 and line 766 update tablestatus two times how about to merge it to one time ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -158,8 +159,6 @@ private[sql] case class CarbonProjectForUpdateCommand( "for the update key") } } - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) Review comment: if we remove it, we will not support the legacy store. we need to check it at first(whether is ok or not if we not support the legacy store). ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. // So moving it to the end and it is outside of locking. CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) Review comment: pass related segment to the cleanStaleDeltaFiles method ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand( // When the table has too many segemnts, it will take a long time. Review comment: how about separate update and horizonCompaction. if horizonCompaction failed, the update may be already successful. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -158,8 +159,6 @@ private[sql] case class CarbonProjectForUpdateCommand( "for the update key") } } - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) Review comment: @ajantha-bhat ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
asfgit closed pull request #3999: URL: https://github.com/apache/carbondata/pull/3999 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |