[GitHub] [carbondata] ajantha-bhat opened a new pull request #3785: [WIP] Fix merge index issue in streaming table

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat opened a new pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox

ajantha-bhat opened a new pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785


    ### Why is this PR needed?
   
   
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-637674877


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3125/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-637678524


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1401/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638037020


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3126/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638037693


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1402/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638183666


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1405/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638184776


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3129/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638412548


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3131/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [WIP] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638412987


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1407/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-638607552


   @QiangCai, please check


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-648760375






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-648826422


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3209/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-648829424


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1482/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-658610534


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-658685680


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3394/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#issuecomment-658686549


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1651/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#discussion_r456179554



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${
+                          carbonMainTable
+                            .getTableName
+                        }")

Review comment:
       combine these lines to one line

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${
+                          carbonMainTable
+                            .getTableName
+                        }")
+            val loadFolderDetailsArray = SegmentStatusManager
+              .readLoadMetadata(carbonMainTable.getMetadataPath)
+            val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
+              String]()
+            var streamingSegment: Set[String] = Set[String]()
+            loadFolderDetailsArray.foreach(loadMetadataDetails => {
+              if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
+                streamingSegment += loadMetadataDetails.getLoadName
+              }
+              segmentFileNameMap
+                .put(loadMetadataDetails.getLoadName,
+                  String.valueOf(loadMetadataDetails.getLoadStartTime))
+            })
+            val segmentsToMerge =
+              if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+                val validSegments =
+                  CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+                val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
+                validSegments.foreach { segment =>
+                  // do not add ROW_V1 format
+                  if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
                     validSegmentIds += segment.getSegmentNo
                   }
-                  validSegmentIds
-                } else {
-                  alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
                 }
-
-              val loadFolderDetailsArray = SegmentStatusManager
-                .readLoadMetadata(carbonMainTable.getMetadataPath)
-              val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
-                String]()
-              loadFolderDetailsArray.foreach(loadMetadataDetails => {
-                segmentFileNameMap
-                  .put(loadMetadataDetails.getLoadName,
-                    String.valueOf(loadMetadataDetails.getLoadStartTime))
-              })
-              // in case of merge index file creation using Alter DDL command
-              // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
-              // store (store <= 1.1 version) and create merge Index file as per new store so that
-              // old store is also upgraded to new store
-              val startTime = System.currentTimeMillis()
-              CarbonMergeFilesRDD.mergeIndexFiles(
-                sparkSession = sparkSession,
-                segmentIds = segmentsToMerge,
-                segmentFileNameToSegmentIdMap = segmentFileNameMap,
-                tablePath = carbonMainTable.getTablePath,
-                carbonTable = carbonMainTable,
-                mergeIndexProperty = true,
-                readFileFooterFromCarbonDataFile = true)
-              LOGGER.info("Total time taken for merge index "
-                          + (System.currentTimeMillis() - startTime) + "ms")
-              // clear Block index Cache
-              MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge)
-              val requestMessage = "Compaction request completed for table " +
-                s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
-              LOGGER.info(requestMessage)
-            } else {
-              val lockMessage = "Not able to acquire the compaction lock for table " +
-                                s"${ carbonMainTable.getDatabaseName }." +
-                                s"${ carbonMainTable.getTableName}"
-              LOGGER.error(lockMessage)
-              CarbonException.analysisException(
-                "Table is already locked for compaction. Please try after some time.")
-            }
-          } finally {
-            lock.unlock()
+                validSegmentIds
+              } else {
+                alterTableMergeIndexEvent.alterTableModel
+                  .customSegmentIds
+                  .get
+                  .filter(!streamingSegment.contains(_))

Review comment:
       .filterNot(streamingSegment.contains(_))

##########
File path: docs/ddl-of-carbondata.md
##########
@@ -750,10 +750,6 @@ Users can specify which columns to include and exclude for local dictionary gene
      ALTER TABLE test_db.carbon COMPACT 'SEGMENT_INDEX'
      ```
 
-     **NOTE:**
-
-     * Merge index is not supported on streaming table.

Review comment:
       Merge index is supported on the streaming table since 2.1. But the streaming segments don't merge index still.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#discussion_r457056046



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${
+                          carbonMainTable
+                            .getTableName
+                        }")
+            val loadFolderDetailsArray = SegmentStatusManager
+              .readLoadMetadata(carbonMainTable.getMetadataPath)
+            val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
+              String]()
+            var streamingSegment: Set[String] = Set[String]()
+            loadFolderDetailsArray.foreach(loadMetadataDetails => {
+              if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
+                streamingSegment += loadMetadataDetails.getLoadName
+              }
+              segmentFileNameMap
+                .put(loadMetadataDetails.getLoadName,
+                  String.valueOf(loadMetadataDetails.getLoadStartTime))
+            })
+            val segmentsToMerge =
+              if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+                val validSegments =
+                  CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+                val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
+                validSegments.foreach { segment =>
+                  // do not add ROW_V1 format
+                  if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
                     validSegmentIds += segment.getSegmentNo
                   }
-                  validSegmentIds
-                } else {
-                  alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
                 }
-
-              val loadFolderDetailsArray = SegmentStatusManager
-                .readLoadMetadata(carbonMainTable.getMetadataPath)
-              val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
-                String]()
-              loadFolderDetailsArray.foreach(loadMetadataDetails => {
-                segmentFileNameMap
-                  .put(loadMetadataDetails.getLoadName,
-                    String.valueOf(loadMetadataDetails.getLoadStartTime))
-              })
-              // in case of merge index file creation using Alter DDL command
-              // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
-              // store (store <= 1.1 version) and create merge Index file as per new store so that
-              // old store is also upgraded to new store
-              val startTime = System.currentTimeMillis()
-              CarbonMergeFilesRDD.mergeIndexFiles(
-                sparkSession = sparkSession,
-                segmentIds = segmentsToMerge,
-                segmentFileNameToSegmentIdMap = segmentFileNameMap,
-                tablePath = carbonMainTable.getTablePath,
-                carbonTable = carbonMainTable,
-                mergeIndexProperty = true,
-                readFileFooterFromCarbonDataFile = true)
-              LOGGER.info("Total time taken for merge index "
-                          + (System.currentTimeMillis() - startTime) + "ms")
-              // clear Block index Cache
-              MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge)
-              val requestMessage = "Compaction request completed for table " +
-                s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
-              LOGGER.info(requestMessage)
-            } else {
-              val lockMessage = "Not able to acquire the compaction lock for table " +
-                                s"${ carbonMainTable.getDatabaseName }." +
-                                s"${ carbonMainTable.getTableName}"
-              LOGGER.error(lockMessage)
-              CarbonException.analysisException(
-                "Table is already locked for compaction. Please try after some time.")
-            }
-          } finally {
-            lock.unlock()
+                validSegmentIds
+              } else {
+                alterTableMergeIndexEvent.alterTableModel
+                  .customSegmentIds
+                  .get
+                  .filter(!streamingSegment.contains(_))

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#discussion_r457056257



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${
+                          carbonMainTable
+                            .getTableName
+                        }")

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3785: [CARBONDATA-3843] Fix merge index issue in streaming table

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#discussion_r457057179



##########
File path: docs/ddl-of-carbondata.md
##########
@@ -750,10 +750,6 @@ Users can specify which columns to include and exclude for local dictionary gene
      ALTER TABLE test_db.carbon COMPACT 'SEGMENT_INDEX'
      ```
 
-     **NOTE:**
-
-     * Merge index is not supported on streaming table.

Review comment:
       ok. done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12