[GitHub] ravipesala commented on a change in pull request #3090: [CARBONDATA-3262] Fix merge index failure handling for compacted segment

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] ravipesala commented on a change in pull request #3090: [CARBONDATA-3262] Fix merge index failure handling for compacted segment

GitBox
ravipesala commented on a change in pull request #3090: [CARBONDATA-3262] Fix merge index failure handling for compacted segment
URL: https://github.com/apache/carbondata/pull/3090#discussion_r249809749
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -821,72 +822,78 @@ object CarbonDataRDDFactory {
       carbonTable: CarbonTable,
       compactedSegments: java.util.List[String],
       operationContext: OperationContext): Unit = {
-    LOGGER.info(s"compaction need status is" +
-                s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
-    if (!carbonTable.isChildDataMap &&
-        CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
-      val compactionSize = 0
-      val isCompactionTriggerByDDl = false
-      val compactionModel = CompactionModel(
-        compactionSize,
-        CompactionType.MINOR,
-        carbonTable,
-        isCompactionTriggerByDDl,
-        CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
-          TableIdentifier(carbonTable.getTableName,
-            Some(carbonTable.getDatabaseName))), None)
-      var storeLocation = ""
-      val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != configuredStore && configuredStore.nonEmpty) {
-        storeLocation = configuredStore(Random.nextInt(configuredStore.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-
-      val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-      ).equalsIgnoreCase("true")
-
-      if (!isConcurrentCompactionAllowed) {
-        handleCompactionForSystemLocking(sqlContext,
-          carbonLoadModel,
-          storeLocation,
+    try {
+      LOGGER.info(s"compaction need status is" +
+                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
+      if (!carbonTable.isChildDataMap &&
+          CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
+        val compactionSize = 0
+        val isCompactionTriggerByDDl = false
+        val compactionModel = CompactionModel(
+          compactionSize,
           CompactionType.MINOR,
           carbonTable,
-          compactedSegments,
-          compactionModel,
-          operationContext
-        )
-      } else {
-        val lock = CarbonLockFactory.getCarbonLockObj(
-          carbonTable.getAbsoluteTableIdentifier,
-          LockUsage.COMPACTION_LOCK)
+          isCompactionTriggerByDDl,
+          CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+            TableIdentifier(carbonTable.getTableName,
+              Some(carbonTable.getDatabaseName))), None)
+        var storeLocation = ""
+        val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+        if (null != configuredStore && configuredStore.nonEmpty) {
+          storeLocation = configuredStore(Random.nextInt(configuredStore.length))
+        }
+        if (storeLocation == null) {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
+        storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
-        if (lock.lockWithRetries()) {
-          LOGGER.info("Acquired the compaction lock.")
-          try {
-            startCompactionThreads(sqlContext,
-              carbonLoadModel,
-              storeLocation,
-              compactionModel,
-              lock,
-              compactedSegments,
-              operationContext
-            )
-          } catch {
-            case e: Exception =>
-              LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-              lock.unlock()
-              throw e
-          }
+        val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+        ).equalsIgnoreCase("true")
+
+        if (!isConcurrentCompactionAllowed) {
+          handleCompactionForSystemLocking(sqlContext,
+            carbonLoadModel,
+            storeLocation,
+            CompactionType.MINOR,
+            carbonTable,
+            compactedSegments,
+            compactionModel,
+            operationContext
+          )
         } else {
-          LOGGER.error("Not able to acquire the compaction lock for table " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
+          val lock = CarbonLockFactory.getCarbonLockObj(
+            carbonTable.getAbsoluteTableIdentifier,
+            LockUsage.COMPACTION_LOCK)
+
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock.")
+            try {
+              startCompactionThreads(sqlContext,
+                carbonLoadModel,
+                storeLocation,
+                compactionModel,
+                lock,
+                compactedSegments,
+                operationContext
+              )
+            } catch {
+              case e: Exception =>
+                LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
+                lock.unlock()
+                throw e
+            }
+          } else {
+            LOGGER.error("Not able to acquire the compaction lock for table " +
+                         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          }
         }
       }
+    } catch {
 
 Review comment:
   Better handle in caller method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services