Login  Register

[GitHub] [carbondata] Kejian-Li commented on a change in pull request #4004: [WIP] update actomity

Posted by GitBox on Nov 26, 2020; 6:58am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-marchpure-opened-a-new-pull-request-4004-WIP-update-benchmark-tp103137p103613.html


Kejian-Li commented on a change in pull request #4004:
URL: https://github.com/apache/carbondata/pull/4004#discussion_r530796893



##########
File path: core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
##########
@@ -149,4 +151,28 @@ public boolean accept(CarbonFile pathName) {
       file.delete();
     }
   }
+
+  public static List<ICarbonLock> acquireLock(CarbonTable carbonTable,

Review comment:
       Name of the method is better to be "acquireLocks"

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -590,13 +583,15 @@ object CarbonDataRDDFactory {
             carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
           }
           val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
+          if (!updateModel.isDefined) {

Review comment:
       It is good to change as (updateModel.isEmpty)

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -224,17 +223,19 @@ case class CarbonMergeDataSetCommand(
       updateTableModel
     ).run(sparkSession)
 
-    if (hasDelAction && count == 0) {
-      val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
-        .getTableStatusFilePath(carbonTable.getTablePath))
-      CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
-        new Segment(loadMetadataDetail.getMergedLoadName,
-          loadMetadataDetail.getSegmentFile)).toSet.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString,
-        true,
-        true, new util.ArrayList[Segment]())
+    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
+      .getTableStatusFilePath(carbonTable.getTablePath))
+    var newMetaEntry: LoadMetadataDetails = null
+    if (!updateTableModel.get.addedLoadDetail.isEmpty

Review comment:
       !updateTableModel.get.addedLoadDetail.isEmpty   =>    updateTableModel.get.addedLoadDetail.isDefined  
   or updateTableModel.get.addedLoadDetail.nonEmpty

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -224,17 +223,19 @@ case class CarbonMergeDataSetCommand(
       updateTableModel
     ).run(sparkSession)
 
-    if (hasDelAction && count == 0) {
-      val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
-        .getTableStatusFilePath(carbonTable.getTablePath))
-      CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
-        new Segment(loadMetadataDetail.getMergedLoadName,
-          loadMetadataDetail.getSegmentFile)).toSet.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString,
-        true,
-        true, new util.ArrayList[Segment]())
+    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
+      .getTableStatusFilePath(carbonTable.getTablePath))
+    var newMetaEntry: LoadMetadataDetails = null
+    if (!updateTableModel.get.addedLoadDetail.isEmpty
+      && updateTableModel.get.addedLoadDetail.isDefined) {
+      newMetaEntry = updateTableModel.get.addedLoadDetail.get
     }
+    CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
+      loadMetadataDetail.getMergedLoadName).toSet.asJava,

Review comment:
       why is getMergedLoadName, I think it may be getLoadName.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -58,175 +54,97 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    var updatedRowCount = 0L
-    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
-    val res = plan find {
-      case relation: LogicalRelation if relation.relation
-        .isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        true
-      case _ => false
-    }
 
-    if (res.isEmpty) {
-      return Array(Row(updatedRowCount)).toSeq
-    }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    setAuditTable(carbonTable)
-    setAuditInfo(Map("plan" -> plan.simpleString))
-    // Do not allow spatial index and its source columns to be updated.
-    AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable, columns)
-    columns.foreach { col =>
-      val dataType = carbonTable.getColumnByName(col).getColumnSchema.getDataType
-      if (dataType.isComplexType) {
-        throw new UnsupportedOperationException("Unsupported operation on Complex data type")
-      }
-
-    }
-    if (!carbonTable.getTableInfo.isTransactionalTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
-    }
-    if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "loading", "data update")
-    }
-
-    if (!carbonTable.canAllow(carbonTable, TableOperation.UPDATE)) {
-      throw new MalformedCarbonCommandException(
-        "update operation is not supported for index")
-    }
+    IUDCommonUtil.checkPreconditionsForUpdate(sparkSession, logicPlan, carbonTable, columns)
 
-    // Block the update operation for non carbon formats
-    if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
-      throw new MalformedCarbonCommandException(
-        s"Unsupported update operation on table containing mixed format segments")
-    }
+    setAuditTable(carbonTable)
+    setAuditInfo(Map("plan" -> logicPlan.simpleString))
 
-    // trigger event for Update table
+    // Step1: trigger PreUpdate event for table
     val operationContext = new OperationContext
-    val updateTablePreEvent: UpdateTablePreEvent =
-      UpdateTablePreEvent(sparkSession, carbonTable)
-    operationContext.setProperty("isLoadOrCompaction", false)
+    val updateTablePreEvent: UpdateTablePreEvent = UpdateTablePreEvent(sparkSession, carbonTable)
     OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
-    val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-        LockUsage.METADATA_LOCK)
-    val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable
-      .getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
-    val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-      LockUsage.UPDATE_LOCK)
-    var lockStatus = false
-    // get the current time stamp which should be same for delete and update.
-    val currentTime = CarbonUpdateUtil.readCurrentTime
-    //    var dataFrame: DataFrame = null
-    var dataSet: DataFrame = null
-    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
-    var hasHorizontalCompactionException = false
-    var hasUpdateException = false
-    var fileTimestamp = ""
-    var updateTableModel: UpdateTableModel = null
-    try {
-      lockStatus = metadataLock.lockWithRetries()
-      if (lockStatus) {
-        logInfo("Successfully able to get the table metadata file lock")
-      }
-      else {
-        throw new Exception("Table is locked for update. Please try after some time")
-      }
 
-      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-      if (updateLock.lockWithRetries()) {
-        if (compactionLock.lockWithRetries()) {
-          // Get RDD.
-          dataSet = if (isPersistEnabled) {
-            Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
-              CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
-          }
-          else {
-            Dataset.ofRows(sparkSession, plan)
-          }
-          if (CarbonProperties.isUniqueValueCheckEnabled) {
-            // If more than one value present for the update key, should fail the update
-            val ds = dataSet.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
-              .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
-              .count()
-              .select("count")
-              .filter(col("count") > lit(1))
-              .limit(1)
-              .collect()
-            // tupleId represents the source rows that are going to get replaced.
-            // If same tupleId appeared more than once means key has more than one value to replace.
-            // which is undefined behavior.
-            if (ds.length > 0 && ds(0).getLong(0) > 1) {
-              throw new UnsupportedOperationException(
-                " update cannot be supported for 1 to N mapping, as more than one value present " +
-                "for the update key")
-            }
-          }
-
-          // do delete operation.
-          val (segmentsToBeDeleted, updatedRowCountTemp) = DeleteExecution.deleteDeltaExecution(
-            databaseNameOp,
-            tableName,
-            sparkSession,
-            dataSet.rdd,
-            currentTime + "",
-            isUpdateOperation = true,
-            executionErrors)
-
-          if (executionErrors.failureCauses != FailureCauses.NONE) {
-            throw new Exception(executionErrors.errorMsg)
-          }
+    // Step2. acquire locks
+    val locksToBeAcquired =
+      List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK, LockUsage.UPDATE_LOCK)
+    val acquiredLocks =
+      CarbonLockUtil.acquireLock(carbonTable, locksToBeAcquired.asJava)
 
-          updatedRowCount = updatedRowCountTemp
-          updateTableModel =
-            UpdateTableModel(true, currentTime, executionErrors, segmentsToBeDeleted, Option.empty)
-          // do update operation.
-          performUpdate(dataSet,
-            databaseNameOp,
-            tableName,
-            plan,
-            sparkSession,
-            updateTableModel,
-            executionErrors)
-
-          // pre-priming for update command
-          DeleteExecution.reloadDistributedSegmentCache(carbonTable,
-            segmentsToBeDeleted, operationContext)(sparkSession)
-
-        } else {
-          throw new ConcurrentOperationException(carbonTable, "compaction", "update")
-        }
-      } else {
-        throw new ConcurrentOperationException(carbonTable, "update/delete", "update")
-      }
-      if (executionErrors.failureCauses != FailureCauses.NONE) {
-        throw new Exception(executionErrors.errorMsg)
-      }
+    // Initialize the variables
+    var updatedRowCount = 0L
+    var dataFrame: DataFrame = null
+    var hasUpdateException = false
+    val deltaDeltaFileTimestamp = CarbonUpdateUtil.readCurrentTime.toString
+    var updateTableModel: UpdateTableModel = null
+    val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
+    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
+    var updatedSegmentList: util.Set[String] = new util.HashSet[String]()
 
-      // Do IUD Compaction.
-      HorizontalCompaction.tryHorizontalCompaction(
-        sparkSession, carbonTable)
+    try {
+      // Step3 get updated data
+      dataFrame = Dataset.ofRows(sparkSession, logicPlan)
 
-      // Truncate materialized views on the current table.
-      val viewManager = MVManagerInSpark.get(sparkSession)
-      val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
-      if (!viewSchemas.isEmpty) {
-        viewManager.onTruncate(viewSchemas)
+      // Step4.1 check unique value if needed
+      if (CarbonProperties.isUniqueValueCheckEnabled) {
+        // If more than one value present for the update key, should fail the update
+        IUDCommonUtil.uniqueValueCheck(dataFrame)
       }
 
-      // trigger event for Update table
+      // Step4.2 calcute the non empty partition in dataframe, then try to coalesce partitions

Review comment:
       calcute   =>   calculate




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]