CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-732649478 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3116/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-732653163 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-732686545 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4873/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-732691284 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3120/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-732771366 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4876/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-732771764 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3123/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#discussion_r530796893 ########## File path: core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java ########## @@ -149,4 +151,28 @@ public boolean accept(CarbonFile pathName) { file.delete(); } } + + public static List<ICarbonLock> acquireLock(CarbonTable carbonTable, Review comment: Name of the method is better to be "acquireLocks" ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -590,13 +583,15 @@ object CarbonDataRDDFactory { carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) } val compactedSegments = new util.ArrayList[String]() - handleSegmentMerging(sqlContext, - carbonLoadModel - .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), - carbonTable, - compactedSegments, - operationContext) - carbonLoadModel.setMergedSegmentIds(compactedSegments) + if (!updateModel.isDefined) { Review comment: It is good to change as (updateModel.isEmpty) ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala ########## @@ -224,17 +223,19 @@ case class CarbonMergeDataSetCommand( updateTableModel ).run(sparkSession) - if (hasDelAction && count == 0) { - val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath - .getTableStatusFilePath(carbonTable.getTablePath)) - CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail => - new Segment(loadMetadataDetail.getMergedLoadName, - loadMetadataDetail.getSegmentFile)).toSet.asJava, - carbonTable, - trxMgr.getLatestTrx.toString, - true, - true, new util.ArrayList[Segment]()) + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath + .getTableStatusFilePath(carbonTable.getTablePath)) + var newMetaEntry: LoadMetadataDetails = null + if (!updateTableModel.get.addedLoadDetail.isEmpty Review comment: !updateTableModel.get.addedLoadDetail.isEmpty => updateTableModel.get.addedLoadDetail.isDefined or updateTableModel.get.addedLoadDetail.nonEmpty ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala ########## @@ -224,17 +223,19 @@ case class CarbonMergeDataSetCommand( updateTableModel ).run(sparkSession) - if (hasDelAction && count == 0) { - val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath - .getTableStatusFilePath(carbonTable.getTablePath)) - CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail => - new Segment(loadMetadataDetail.getMergedLoadName, - loadMetadataDetail.getSegmentFile)).toSet.asJava, - carbonTable, - trxMgr.getLatestTrx.toString, - true, - true, new util.ArrayList[Segment]()) + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath + .getTableStatusFilePath(carbonTable.getTablePath)) + var newMetaEntry: LoadMetadataDetails = null + if (!updateTableModel.get.addedLoadDetail.isEmpty + && updateTableModel.get.addedLoadDetail.isDefined) { + newMetaEntry = updateTableModel.get.addedLoadDetail.get } + CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail => + loadMetadataDetail.getMergedLoadName).toSet.asJava, Review comment: why is getMergedLoadName, I think it may be getLoadName. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -58,175 +54,97 @@ private[sql] case class CarbonProjectForUpdateCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - var updatedRowCount = 0L - IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan) - val res = plan find { - case relation: LogicalRelation if relation.relation - .isInstanceOf[CarbonDatasourceHadoopRelation] => - true - case _ => false - } - if (res.isEmpty) { - return Array(Row(updatedRowCount)).toSeq - } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - setAuditTable(carbonTable) - setAuditInfo(Map("plan" -> plan.simpleString)) - // Do not allow spatial index and its source columns to be updated. - AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, columns) - columns.foreach { col => - val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType - if (dataType.isComplexType) { - throw new UnsupportedOperationException("Unsupported operation on Complex data type") - } - - } - if (!carbonTable.getTableInfo.isTransactionalTable) { - throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") - } - if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { - throw new ConcurrentOperationException(carbonTable, "loading", "data update") - } - - if (!carbonTable.canAllow(carbonTable, TableOperation.UPDATE)) { - throw new MalformedCarbonCommandException( - "update operation is not supported for index") - } + IUDCommonUtil.checkPreconditionsForUpdate(sparkSession, logicPlan, carbonTable, columns) - // Block the update operation for non carbon formats - if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) { - throw new MalformedCarbonCommandException( - s"Unsupported update operation on table containing mixed format segments") - } + setAuditTable(carbonTable) + setAuditInfo(Map("plan" -> logicPlan.simpleString)) - // trigger event for Update table + // Step1: trigger PreUpdate event for table val operationContext = new OperationContext - val updateTablePreEvent: UpdateTablePreEvent = - UpdateTablePreEvent(sparkSession, carbonTable) - operationContext.setProperty("isLoadOrCompaction", false) + val updateTablePreEvent: UpdateTablePreEvent = UpdateTablePreEvent(sparkSession, carbonTable) OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext) - val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - LockUsage.METADATA_LOCK) - val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable - .getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK) - val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - LockUsage.UPDATE_LOCK) - var lockStatus = false - // get the current time stamp which should be same for delete and update. - val currentTime = CarbonUpdateUtil.readCurrentTime - // var dataFrame: DataFrame = null - var dataSet: DataFrame = null - val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset - var hasHorizontalCompactionException = false - var hasUpdateException = false - var fileTimestamp = "" - var updateTableModel: UpdateTableModel = null - try { - lockStatus = metadataLock.lockWithRetries() - if (lockStatus) { - logInfo("Successfully able to get the table metadata file lock") - } - else { - throw new Exception("Table is locked for update. Please try after some time") - } - val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") - if (updateLock.lockWithRetries()) { - if (compactionLock.lockWithRetries()) { - // Get RDD. - dataSet = if (isPersistEnabled) { - Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString( - CarbonProperties.getInstance.getUpdateDatasetStorageLevel())) - } - else { - Dataset.ofRows(sparkSession, plan) - } - if (CarbonProperties.isUniqueValueCheckEnabled) { - // If more than one value present for the update key, should fail the update - val ds = dataSet.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) - .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) - .count() - .select("count") - .filter(col("count") > lit(1)) - .limit(1) - .collect() - // tupleId represents the source rows that are going to get replaced. - // If same tupleId appeared more than once means key has more than one value to replace. - // which is undefined behavior. - if (ds.length > 0 && ds(0).getLong(0) > 1) { - throw new UnsupportedOperationException( - " update cannot be supported for 1 to N mapping, as more than one value present " + - "for the update key") - } - } - - // do delete operation. - val (segmentsToBeDeleted, updatedRowCountTemp) = DeleteExecution.deleteDeltaExecution( - databaseNameOp, - tableName, - sparkSession, - dataSet.rdd, - currentTime + "", - isUpdateOperation = true, - executionErrors) - - if (executionErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executionErrors.errorMsg) - } + // Step2. acquire locks + val locksToBeAcquired = + List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK, LockUsage.UPDATE_LOCK) + val acquiredLocks = + CarbonLockUtil.acquireLock(carbonTable, locksToBeAcquired.asJava) - updatedRowCount = updatedRowCountTemp - updateTableModel = - UpdateTableModel(true, currentTime, executionErrors, segmentsToBeDeleted, Option.empty) - // do update operation. - performUpdate(dataSet, - databaseNameOp, - tableName, - plan, - sparkSession, - updateTableModel, - executionErrors) - - // pre-priming for update command - DeleteExecution.reloadDistributedSegmentCache(carbonTable, - segmentsToBeDeleted, operationContext)(sparkSession) - - } else { - throw new ConcurrentOperationException(carbonTable, "compaction", "update") - } - } else { - throw new ConcurrentOperationException(carbonTable, "update/delete", "update") - } - if (executionErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executionErrors.errorMsg) - } + // Initialize the variables + var updatedRowCount = 0L + var dataFrame: DataFrame = null + var hasUpdateException = false + val deltaDeltaFileTimestamp = CarbonUpdateUtil.readCurrentTime.toString + var updateTableModel: UpdateTableModel = null + val executionErrors = ExecutionErrors(FailureCauses.NONE, "") + val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset + var updatedSegmentList: util.Set[String] = new util.HashSet[String]() - // Do IUD Compaction. - HorizontalCompaction.tryHorizontalCompaction( - sparkSession, carbonTable) + try { + // Step3 get updated data + dataFrame = Dataset.ofRows(sparkSession, logicPlan) - // Truncate materialized views on the current table. - val viewManager = MVManagerInSpark.get(sparkSession) - val viewSchemas = viewManager.getSchemasOnTable(carbonTable) - if (!viewSchemas.isEmpty) { - viewManager.onTruncate(viewSchemas) + // Step4.1 check unique value if needed + if (CarbonProperties.isUniqueValueCheckEnabled) { + // If more than one value present for the update key, should fail the update + IUDCommonUtil.uniqueValueCheck(dataFrame) } - // trigger event for Update table + // Step4.2 calcute the non empty partition in dataframe, then try to coalesce partitions Review comment: calcute => calculate ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#discussion_r530811074 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ########## @@ -350,99 +322,67 @@ object DeleteExecution { resultIter } - (res, blockMappingVO) + res } // all or none : update status file, only if complete delete operation is successful. def checkAndUpdateStatusFiles( executorErrors: ExecutionErrors, - res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]], carbonTable: CarbonTable, timestamp: String, - blockMappingVO: BlockMappingVO, - isUpdateOperation: Boolean): Seq[Segment] = { - val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() - val segmentDetails = new util.HashSet[Segment]() - res.foreach(resultOfSeg => resultOfSeg.foreach( - resultOfBlock => { - if (resultOfBlock._1 == SegmentStatus.SUCCESS) { - blockUpdateDetailsList.add(resultOfBlock._2._1) - segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName)) - // if this block is invalid then decrement block count in map. - if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) { - CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, - blockMappingVO.getSegmentNumberOfBlockMapping) - } - } else { - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - val errorMsg = - "Delete data operation is failed due to failure in creating delete delta file for " + - "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + - resultOfBlock._2._1.getBlockName - executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - executorErrors.errorMsg = resultOfBlock._2._2.errorMsg + isUpdateOperation: Boolean, + updateModel: UpdateTableModel, + blockUpdateDetailsList: java.util.List[SegmentUpdateDetails], + updatedSegmentList: util.Set[String]): Unit = { - if (executorErrors.failureCauses == FailureCauses.NONE) { - executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - executorErrors.errorMsg = errorMsg - } - LOGGER.error(errorMsg) - return Seq.empty[Segment] - } - })) + val updateSegmentStatus = CarbonUpdateUtil + .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) - val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil - .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) + if (!updateSegmentStatus) { + val errorMessage = "Update data operation is failed due to failure " + + "in updatetablestatus update." + LOGGER.error("Delete data operation is failed due to failure in updatetablestatus update.") + executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE + executorErrors.errorMsg = errorMessage + throw new Exception(executorErrors.errorMsg) + } - val segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala + var newMetaEntry: LoadMetadataDetails = null + if (isUpdateOperation && !updateModel.addedLoadDetail.isEmpty + && updateModel.addedLoadDetail.isDefined) { + newMetaEntry = updateModel.addedLoadDetail.get + } + + val updateTableMetadataStatus = CarbonUpdateUtil.updateTableMetadataStatus(updatedSegmentList, + carbonTable, timestamp, true, true, + new util.ArrayList[Segment](0), newMetaEntry) // this is delete flow so no need of putting timestamp in the status file. Review comment: this line is useless, it should be removed I think. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#discussion_r530817986 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala ########## @@ -17,20 +17,213 @@ package org.apache.spark.sql.execution.command.mutation -import org.apache.spark.sql._ +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.strategy.MixedFormatHandler +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.view.{MVSchema, MVStatus} +import org.apache.carbondata.events.{Event, OperationContext, OperationListenerBus, UpdateTablePostEvent} +import org.apache.carbondata.view.MVManagerInSpark + /** * Util for IUD common function */ object IUDCommonUtil { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def tryHorizontalCompaction(sparkSession: SparkSession, + carbonTable: CarbonTable, + updatedSegmentList: Set[String]): Unit = { + var hasCompactionException = false + var compactTimestamp = "" + try { + HorizontalCompaction.tryHorizontalCompaction( + sparkSession, carbonTable, updatedSegmentList) + } catch { + case e: HorizontalCompactionException => + LOGGER.error( + "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) + // In case of failure , clean all related delta files + compactTimestamp = e.compactionTimeStamp.toString + hasCompactionException = true + } finally { + if (hasCompactionException) { + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, compactTimestamp) + } + } + } + + def refreshMVandIndex(sparkSession: SparkSession, + carbonTable: CarbonTable, operationContext: OperationContext, event: Event): Unit = { + if (CarbonProperties.getInstance().isMVEnabled) { + var hasMaintainMVException = false + val viewManager = MVManagerInSpark.get(sparkSession) + var viewSchemas: util.List[MVSchema] = new util.ArrayList + try { + // Truncate materialized views on the current table. + viewSchemas = viewManager.getSchemasOnTable(carbonTable) + if (!viewSchemas.isEmpty) { + viewManager.onTruncate(viewSchemas) + } + // Load materialized views on the current table. + OperationListenerBus.getInstance.fireEvent(event, operationContext) + } catch { + case e: Exception => + hasMaintainMVException = true + LOGGER.error("Maintain MV in Update operation failed. Please check logs." + e) + } finally { + if (hasMaintainMVException) { + viewManager.setStatus(viewSchemas, MVStatus.DISABLED) + } + } + } + } + + + def coalesceDataSetIfNeeded(dataset: Dataset[Row], + nonEmptyPartitionCount: Long, + isPersistEnabled: Boolean): Dataset[Row] = { + val ratioOfNonEmptyPartition: Float = nonEmptyPartitionCount / dataset.rdd.getNumPartitions + var coalescedDataSet: Dataset[Row] = dataset + if (ratioOfNonEmptyPartition < 0.5f) { + coalescedDataSet = dataset.coalesce(nonEmptyPartitionCount.toInt) + } + if (isPersistEnabled) { + coalescedDataSet = coalescedDataSet.persist( + StorageLevel.fromString(CarbonProperties.getInstance() + .getUpdateDatasetStorageLevel())) + } + coalescedDataSet + } + + def countNonEmptyPartitions(sparkSession: SparkSession, dataset: Dataset[Row], + carbonTable: CarbonTable, uuid: String): Long = { + val metricName = "nonEmptyPart" + val accumulatorName = getAccumulatorName(carbonTable, uuid, metricName) + val nonEmptyPart = sparkSession.sparkContext.longAccumulator(accumulatorName) + dataset.foreachPartition(partition => + if (!partition.isEmpty) { + nonEmptyPart.add(1) + } + ) + nonEmptyPart.value + } + + def getAccumulatorName(carbonTable: CarbonTable, uuid: String, metricName: String): String = { + s"${carbonTable.getTableId}_${uuid}_${metricName}" + } + + def uniqueValueCheck(dataset: Dataset[Row]): Unit = { + val ds = dataset.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) + .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) + .count() + .select("count") + .filter(col("count") > lit(1)) + .limit(1) + .collect() + // tupleId represents the source rows that are going to get replaced. + // If same tupleId appeared more than once means key has more than one value to replace. + // which is undefined behavior. + if (ds.length > 0 && ds(0).getLong(0) > 1) { + throw new UnsupportedOperationException( + " update cannot be supported for 1 to N mapping, as more than one value present " + + "for the update key") + } + } + + def checkPreconditionsForDelete(sparkSession: SparkSession, + logicalPlan: LogicalPlan, + carbonTable: CarbonTable): Unit = { + IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan) + IUDCommonUtil.checkIsTranstionTable(carbonTable) + IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable) + IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.DELETE) + IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable) + } + + def checkPreconditionsForUpdate(sparkSession: SparkSession, + logicalPlan: LogicalPlan, + carbonTable: CarbonTable, + columns: List[String]): Unit = { + IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan) + IUDCommonUtil.checkIsTranstionTable(carbonTable) + IUDCommonUtil.checkIfSpartialColumnsExists(carbonTable, columns) + IUDCommonUtil.checkIfColumnWithComplexTypeExists(carbonTable, columns) + IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable) + IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.UPDATE) + IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable) + } + + def checkIsHeterogeneousSegmentTable(carbonTable: CarbonTable): Unit = { + if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) { + throw new MalformedCarbonCommandException( + s"Unsupported operation on table containing mixed format segments") + } + } + + def checkIsIndexedTable(carbonTable: CarbonTable, operation: TableOperation): Unit = { + if (!carbonTable.canAllow(carbonTable, operation)) { + throw new MalformedCarbonCommandException( + "update/delete operation is not supported for index") + } + } + + def checkIsLoadInProgressInTable(carbonTable: CarbonTable): Unit = { + if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "loading", "data update") + } + } + + def checkIsTranstionTable(carbonTable: CarbonTable): Unit = { Review comment: Transtion => Transaction ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#discussion_r530818500 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala ########## @@ -17,20 +17,213 @@ package org.apache.spark.sql.execution.command.mutation -import org.apache.spark.sql._ +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.strategy.MixedFormatHandler +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.view.{MVSchema, MVStatus} +import org.apache.carbondata.events.{Event, OperationContext, OperationListenerBus, UpdateTablePostEvent} +import org.apache.carbondata.view.MVManagerInSpark + /** * Util for IUD common function */ object IUDCommonUtil { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def tryHorizontalCompaction(sparkSession: SparkSession, + carbonTable: CarbonTable, + updatedSegmentList: Set[String]): Unit = { + var hasCompactionException = false + var compactTimestamp = "" + try { + HorizontalCompaction.tryHorizontalCompaction( + sparkSession, carbonTable, updatedSegmentList) + } catch { + case e: HorizontalCompactionException => + LOGGER.error( + "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) + // In case of failure , clean all related delta files + compactTimestamp = e.compactionTimeStamp.toString + hasCompactionException = true + } finally { + if (hasCompactionException) { + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, compactTimestamp) + } + } + } + + def refreshMVandIndex(sparkSession: SparkSession, + carbonTable: CarbonTable, operationContext: OperationContext, event: Event): Unit = { + if (CarbonProperties.getInstance().isMVEnabled) { + var hasMaintainMVException = false + val viewManager = MVManagerInSpark.get(sparkSession) + var viewSchemas: util.List[MVSchema] = new util.ArrayList + try { + // Truncate materialized views on the current table. + viewSchemas = viewManager.getSchemasOnTable(carbonTable) + if (!viewSchemas.isEmpty) { + viewManager.onTruncate(viewSchemas) + } + // Load materialized views on the current table. + OperationListenerBus.getInstance.fireEvent(event, operationContext) + } catch { + case e: Exception => + hasMaintainMVException = true + LOGGER.error("Maintain MV in Update operation failed. Please check logs." + e) + } finally { + if (hasMaintainMVException) { + viewManager.setStatus(viewSchemas, MVStatus.DISABLED) + } + } + } + } + + + def coalesceDataSetIfNeeded(dataset: Dataset[Row], + nonEmptyPartitionCount: Long, + isPersistEnabled: Boolean): Dataset[Row] = { + val ratioOfNonEmptyPartition: Float = nonEmptyPartitionCount / dataset.rdd.getNumPartitions + var coalescedDataSet: Dataset[Row] = dataset + if (ratioOfNonEmptyPartition < 0.5f) { + coalescedDataSet = dataset.coalesce(nonEmptyPartitionCount.toInt) + } + if (isPersistEnabled) { + coalescedDataSet = coalescedDataSet.persist( + StorageLevel.fromString(CarbonProperties.getInstance() + .getUpdateDatasetStorageLevel())) + } + coalescedDataSet + } + + def countNonEmptyPartitions(sparkSession: SparkSession, dataset: Dataset[Row], + carbonTable: CarbonTable, uuid: String): Long = { + val metricName = "nonEmptyPart" + val accumulatorName = getAccumulatorName(carbonTable, uuid, metricName) + val nonEmptyPart = sparkSession.sparkContext.longAccumulator(accumulatorName) + dataset.foreachPartition(partition => + if (!partition.isEmpty) { + nonEmptyPart.add(1) + } + ) + nonEmptyPart.value + } + + def getAccumulatorName(carbonTable: CarbonTable, uuid: String, metricName: String): String = { + s"${carbonTable.getTableId}_${uuid}_${metricName}" + } + + def uniqueValueCheck(dataset: Dataset[Row]): Unit = { + val ds = dataset.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) + .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) + .count() + .select("count") + .filter(col("count") > lit(1)) + .limit(1) + .collect() + // tupleId represents the source rows that are going to get replaced. + // If same tupleId appeared more than once means key has more than one value to replace. + // which is undefined behavior. + if (ds.length > 0 && ds(0).getLong(0) > 1) { + throw new UnsupportedOperationException( + " update cannot be supported for 1 to N mapping, as more than one value present " + + "for the update key") + } + } + + def checkPreconditionsForDelete(sparkSession: SparkSession, + logicalPlan: LogicalPlan, + carbonTable: CarbonTable): Unit = { + IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan) + IUDCommonUtil.checkIsTranstionTable(carbonTable) + IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable) + IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.DELETE) + IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable) + } + + def checkPreconditionsForUpdate(sparkSession: SparkSession, + logicalPlan: LogicalPlan, + carbonTable: CarbonTable, + columns: List[String]): Unit = { + IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan) + IUDCommonUtil.checkIsTranstionTable(carbonTable) + IUDCommonUtil.checkIfSpartialColumnsExists(carbonTable, columns) + IUDCommonUtil.checkIfColumnWithComplexTypeExists(carbonTable, columns) + IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable) + IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.UPDATE) + IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable) + } + + def checkIsHeterogeneousSegmentTable(carbonTable: CarbonTable): Unit = { + if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) { + throw new MalformedCarbonCommandException( + s"Unsupported operation on table containing mixed format segments") + } + } + + def checkIsIndexedTable(carbonTable: CarbonTable, operation: TableOperation): Unit = { + if (!carbonTable.canAllow(carbonTable, operation)) { + throw new MalformedCarbonCommandException( + "update/delete operation is not supported for index") + } + } + + def checkIsLoadInProgressInTable(carbonTable: CarbonTable): Unit = { + if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "loading", "data update") + } + } + + def checkIsTranstionTable(carbonTable: CarbonTable): Unit = { + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") + } + } + + def checkIfColumnWithComplexTypeExists(carbonTable: CarbonTable, columns: List[String]): Unit = { + columns.foreach { col => + val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType + if (dataType.isComplexType) { + throw new UnsupportedOperationException("Unsupported operation on Complex data type") + } + } + } + + def checkIfSpartialColumnsExists(carbonTable: CarbonTable, columns: List[String]): Unit = { Review comment: Spartial => Spatial ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#discussion_r530845249 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ########## @@ -299,37 +279,29 @@ object DeleteExecution { .getDeleteDeltaFilePath(blockPath, blockName, timestamp) val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeltaPath) - - + segmentUpdateDetails.setSegmentName(segmentId) segmentUpdateDetails.setBlockName(blockName) segmentUpdateDetails.setActualBlockName(completeBlockName) - segmentUpdateDetails.setSegmentName(load.getLoadName) segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp) segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp) - val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock val totalDeletedRows: Long = alreadyDeletedRows + countOfRows segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString) - if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) { - segmentUpdateDetails.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) - } Review comment: Why remove if clause? in CarbonUpdateUitl.mergeSegmentUpdate(), line 259 would set blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#discussion_r530845249 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ########## @@ -299,37 +279,29 @@ object DeleteExecution { .getDeleteDeltaFilePath(blockPath, blockName, timestamp) val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeltaPath) - - + segmentUpdateDetails.setSegmentName(segmentId) segmentUpdateDetails.setBlockName(blockName) segmentUpdateDetails.setActualBlockName(completeBlockName) - segmentUpdateDetails.setSegmentName(load.getLoadName) segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp) segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp) - val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock val totalDeletedRows: Long = alreadyDeletedRows + countOfRows segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString) - if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) { - segmentUpdateDetails.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) - } Review comment: Why remove if clause? CarbonUpdateUitl.mergeSegmentUpdate() method would set blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-735943690 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4978/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-735944674 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3223/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-736730411 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5001/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-736734171 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3246/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-743769722 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5146/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-743769968 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3384/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-743946025 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3385/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4004: URL: https://github.com/apache/carbondata/pull/4004#issuecomment-743946136 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5147/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |