kunal642 commented on a change in pull request #3090: [CARBONDATA-3262] Fix merge index failure handling for compacted segment
URL: https://github.com/apache/carbondata/pull/3090#discussion_r249813436 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -821,72 +822,78 @@ object CarbonDataRDDFactory { carbonTable: CarbonTable, compactedSegments: java.util.List[String], operationContext: OperationContext): Unit = { - LOGGER.info(s"compaction need status is" + - s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") - if (!carbonTable.isChildDataMap && - CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { - val compactionSize = 0 - val isCompactionTriggerByDDl = false - val compactionModel = CompactionModel( - compactionSize, - CompactionType.MINOR, - carbonTable, - isCompactionTriggerByDDl, - CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, - TableIdentifier(carbonTable.getTableName, - Some(carbonTable.getDatabaseName))), None) - var storeLocation = "" - val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != configuredStore && configuredStore.nonEmpty) { - storeLocation = configuredStore(Random.nextInt(configuredStore.length)) - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") - } - storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - - val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, - CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION - ).equalsIgnoreCase("true") - - if (!isConcurrentCompactionAllowed) { - handleCompactionForSystemLocking(sqlContext, - carbonLoadModel, - storeLocation, + try { + LOGGER.info(s"compaction need status is" + + s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") + if (!carbonTable.isChildDataMap && + CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { + val compactionSize = 0 + val isCompactionTriggerByDDl = false + val compactionModel = CompactionModel( + compactionSize, CompactionType.MINOR, carbonTable, - compactedSegments, - compactionModel, - operationContext - ) - } else { - val lock = CarbonLockFactory.getCarbonLockObj( - carbonTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + isCompactionTriggerByDDl, + CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, + Some(carbonTable.getDatabaseName))), None) + var storeLocation = "" + val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != configuredStore && configuredStore.nonEmpty) { + storeLocation = configuredStore(Random.nextInt(configuredStore.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock.") - try { - startCompactionThreads(sqlContext, - carbonLoadModel, - storeLocation, - compactionModel, - lock, - compactedSegments, - operationContext - ) - } catch { - case e: Exception => - LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") - lock.unlock() - throw e - } + val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, + CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION + ).equalsIgnoreCase("true") + + if (!isConcurrentCompactionAllowed) { + handleCompactionForSystemLocking(sqlContext, + carbonLoadModel, + storeLocation, + CompactionType.MINOR, + carbonTable, + compactedSegments, + compactionModel, + operationContext + ) } else { - LOGGER.error("Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock.") + try { + startCompactionThreads(sqlContext, + carbonLoadModel, + storeLocation, + compactionModel, + lock, + compactedSegments, + operationContext + ) + } catch { + case e: Exception => + LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") + lock.unlock() + throw e + } + } else { + LOGGER.error("Not able to acquire the compaction lock for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + } } } + } catch { + case ex: Exception => + LOGGER.error(s"Auto Compaction failed for table ${ Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |