ajantha-bhat opened a new pull request #3785: URL: https://github.com/apache/carbondata/pull/3785 ### Why is this PR needed? ### What changes were proposed in this PR? ### Does this PR introduce any user interface change? - No - Yes. (please explain the change and update document) ### Is any new testcase added? - No - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-637674877 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3125/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-637678524 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1401/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638037020 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3126/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638037693 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1402/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638183666 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1405/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638184776 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3129/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638412548 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3131/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638412987 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1407/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638607552 @QiangCai, please check ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-648760375 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-648826422 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3209/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-648829424 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1482/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-658610534 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-658685680 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3394/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#issuecomment-658686549 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1651/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#discussion_r456179554 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ + carbonMainTable + .getTableName + }") Review comment: combine these lines to one line ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ + carbonMainTable + .getTableName + }") + val loadFolderDetailsArray = SegmentStatusManager + .readLoadMetadata(carbonMainTable.getMetadataPath) + val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, + String]() + var streamingSegment: Set[String] = Set[String]() + loadFolderDetailsArray.foreach(loadMetadataDetails => { + if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { + streamingSegment += loadMetadataDetails.getLoadName + } + segmentFileNameMap + .put(loadMetadataDetails.getLoadName, + String.valueOf(loadMetadataDetails.getLoadStartTime)) + }) + val segmentsToMerge = + if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { + val validSegments = + CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala + val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() + validSegments.foreach { segment => + // do not add ROW_V1 format + if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { validSegmentIds += segment.getSegmentNo } - validSegmentIds - } else { - alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get } - - val loadFolderDetailsArray = SegmentStatusManager - .readLoadMetadata(carbonMainTable.getMetadataPath) - val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, - String]() - loadFolderDetailsArray.foreach(loadMetadataDetails => { - segmentFileNameMap - .put(loadMetadataDetails.getLoadName, - String.valueOf(loadMetadataDetails.getLoadStartTime)) - }) - // in case of merge index file creation using Alter DDL command - // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy - // store (store <= 1.1 version) and create merge Index file as per new store so that - // old store is also upgraded to new store - val startTime = System.currentTimeMillis() - CarbonMergeFilesRDD.mergeIndexFiles( - sparkSession = sparkSession, - segmentIds = segmentsToMerge, - segmentFileNameToSegmentIdMap = segmentFileNameMap, - tablePath = carbonMainTable.getTablePath, - carbonTable = carbonMainTable, - mergeIndexProperty = true, - readFileFooterFromCarbonDataFile = true) - LOGGER.info("Total time taken for merge index " - + (System.currentTimeMillis() - startTime) + "ms") - // clear Block index Cache - MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge) - val requestMessage = "Compaction request completed for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" - LOGGER.info(requestMessage) - } else { - val lockMessage = "Not able to acquire the compaction lock for table " + - s"${ carbonMainTable.getDatabaseName }." + - s"${ carbonMainTable.getTableName}" - LOGGER.error(lockMessage) - CarbonException.analysisException( - "Table is already locked for compaction. Please try after some time.") - } - } finally { - lock.unlock() + validSegmentIds + } else { + alterTableMergeIndexEvent.alterTableModel + .customSegmentIds + .get + .filter(!streamingSegment.contains(_)) Review comment: .filterNot(streamingSegment.contains(_)) ########## File path: docs/ddl-of-carbondata.md ########## @@ -750,10 +750,6 @@ Users can specify which columns to include and exclude for local dictionary gene ALTER TABLE test_db.carbon COMPACT 'SEGMENT_INDEX' ``` - **NOTE:** - - * Merge index is not supported on streaming table. Review comment: Merge index is supported on the streaming table since 2.1. But the streaming segments don't merge index still. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#discussion_r457056046 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ + carbonMainTable + .getTableName + }") + val loadFolderDetailsArray = SegmentStatusManager + .readLoadMetadata(carbonMainTable.getMetadataPath) + val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, + String]() + var streamingSegment: Set[String] = Set[String]() + loadFolderDetailsArray.foreach(loadMetadataDetails => { + if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { + streamingSegment += loadMetadataDetails.getLoadName + } + segmentFileNameMap + .put(loadMetadataDetails.getLoadName, + String.valueOf(loadMetadataDetails.getLoadStartTime)) + }) + val segmentsToMerge = + if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { + val validSegments = + CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala + val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() + validSegments.foreach { segment => + // do not add ROW_V1 format + if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { validSegmentIds += segment.getSegmentNo } - validSegmentIds - } else { - alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get } - - val loadFolderDetailsArray = SegmentStatusManager - .readLoadMetadata(carbonMainTable.getMetadataPath) - val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, - String]() - loadFolderDetailsArray.foreach(loadMetadataDetails => { - segmentFileNameMap - .put(loadMetadataDetails.getLoadName, - String.valueOf(loadMetadataDetails.getLoadStartTime)) - }) - // in case of merge index file creation using Alter DDL command - // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy - // store (store <= 1.1 version) and create merge Index file as per new store so that - // old store is also upgraded to new store - val startTime = System.currentTimeMillis() - CarbonMergeFilesRDD.mergeIndexFiles( - sparkSession = sparkSession, - segmentIds = segmentsToMerge, - segmentFileNameToSegmentIdMap = segmentFileNameMap, - tablePath = carbonMainTable.getTablePath, - carbonTable = carbonMainTable, - mergeIndexProperty = true, - readFileFooterFromCarbonDataFile = true) - LOGGER.info("Total time taken for merge index " - + (System.currentTimeMillis() - startTime) + "ms") - // clear Block index Cache - MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge) - val requestMessage = "Compaction request completed for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" - LOGGER.info(requestMessage) - } else { - val lockMessage = "Not able to acquire the compaction lock for table " + - s"${ carbonMainTable.getDatabaseName }." + - s"${ carbonMainTable.getTableName}" - LOGGER.error(lockMessage) - CarbonException.analysisException( - "Table is already locked for compaction. Please try after some time.") - } - } finally { - lock.unlock() + validSegmentIds + } else { + alterTableMergeIndexEvent.alterTableModel + .customSegmentIds + .get + .filter(!streamingSegment.contains(_)) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#discussion_r457056257 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ + carbonMainTable + .getTableName + }") Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#discussion_r457057179 ########## File path: docs/ddl-of-carbondata.md ########## @@ -750,10 +750,6 @@ Users can specify which columns to include and exclude for local dictionary gene ALTER TABLE test_db.carbon COMPACT 'SEGMENT_INDEX' ``` - **NOTE:** - - * Merge index is not supported on streaming table. Review comment: ok. done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |