Posted by
GitBox on
Nov 26, 2020; 7:25am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-marchpure-opened-a-new-pull-request-4004-WIP-update-benchmark-tp103137p103618.html
Kejian-Li commented on a change in pull request #4004:
URL:
https://github.com/apache/carbondata/pull/4004#discussion_r530817986##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
##########
@@ -17,20 +17,213 @@
package org.apache.spark.sql.execution.command.mutation
-import org.apache.spark.sql._
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
+import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.hive.HiveSessionCatalog
+import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.features.TableOperation
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.view.{MVSchema, MVStatus}
+import org.apache.carbondata.events.{Event, OperationContext, OperationListenerBus, UpdateTablePostEvent}
+import org.apache.carbondata.view.MVManagerInSpark
+
/**
* Util for IUD common function
*/
object IUDCommonUtil {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ def tryHorizontalCompaction(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ updatedSegmentList: Set[String]): Unit = {
+ var hasCompactionException = false
+ var compactTimestamp = ""
+ try {
+ HorizontalCompaction.tryHorizontalCompaction(
+ sparkSession, carbonTable, updatedSegmentList)
+ } catch {
+ case e: HorizontalCompactionException =>
+ LOGGER.error(
+ "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
+ // In case of failure , clean all related delta files
+ compactTimestamp = e.compactionTimeStamp.toString
+ hasCompactionException = true
+ } finally {
+ if (hasCompactionException) {
+ CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, compactTimestamp)
+ }
+ }
+ }
+
+ def refreshMVandIndex(sparkSession: SparkSession,
+ carbonTable: CarbonTable, operationContext: OperationContext, event: Event): Unit = {
+ if (CarbonProperties.getInstance().isMVEnabled) {
+ var hasMaintainMVException = false
+ val viewManager = MVManagerInSpark.get(sparkSession)
+ var viewSchemas: util.List[MVSchema] = new util.ArrayList
+ try {
+ // Truncate materialized views on the current table.
+ viewSchemas = viewManager.getSchemasOnTable(carbonTable)
+ if (!viewSchemas.isEmpty) {
+ viewManager.onTruncate(viewSchemas)
+ }
+ // Load materialized views on the current table.
+ OperationListenerBus.getInstance.fireEvent(event, operationContext)
+ } catch {
+ case e: Exception =>
+ hasMaintainMVException = true
+ LOGGER.error("Maintain MV in Update operation failed. Please check logs." + e)
+ } finally {
+ if (hasMaintainMVException) {
+ viewManager.setStatus(viewSchemas, MVStatus.DISABLED)
+ }
+ }
+ }
+ }
+
+
+ def coalesceDataSetIfNeeded(dataset: Dataset[Row],
+ nonEmptyPartitionCount: Long,
+ isPersistEnabled: Boolean): Dataset[Row] = {
+ val ratioOfNonEmptyPartition: Float = nonEmptyPartitionCount / dataset.rdd.getNumPartitions
+ var coalescedDataSet: Dataset[Row] = dataset
+ if (ratioOfNonEmptyPartition < 0.5f) {
+ coalescedDataSet = dataset.coalesce(nonEmptyPartitionCount.toInt)
+ }
+ if (isPersistEnabled) {
+ coalescedDataSet = coalescedDataSet.persist(
+ StorageLevel.fromString(CarbonProperties.getInstance()
+ .getUpdateDatasetStorageLevel()))
+ }
+ coalescedDataSet
+ }
+
+ def countNonEmptyPartitions(sparkSession: SparkSession, dataset: Dataset[Row],
+ carbonTable: CarbonTable, uuid: String): Long = {
+ val metricName = "nonEmptyPart"
+ val accumulatorName = getAccumulatorName(carbonTable, uuid, metricName)
+ val nonEmptyPart = sparkSession.sparkContext.longAccumulator(accumulatorName)
+ dataset.foreachPartition(partition =>
+ if (!partition.isEmpty) {
+ nonEmptyPart.add(1)
+ }
+ )
+ nonEmptyPart.value
+ }
+
+ def getAccumulatorName(carbonTable: CarbonTable, uuid: String, metricName: String): String = {
+ s"${carbonTable.getTableId}_${uuid}_${metricName}"
+ }
+
+ def uniqueValueCheck(dataset: Dataset[Row]): Unit = {
+ val ds = dataset.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+ .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+ .count()
+ .select("count")
+ .filter(col("count") > lit(1))
+ .limit(1)
+ .collect()
+ // tupleId represents the source rows that are going to get replaced.
+ // If same tupleId appeared more than once means key has more than one value to replace.
+ // which is undefined behavior.
+ if (ds.length > 0 && ds(0).getLong(0) > 1) {
+ throw new UnsupportedOperationException(
+ " update cannot be supported for 1 to N mapping, as more than one value present " +
+ "for the update key")
+ }
+ }
+
+ def checkPreconditionsForDelete(sparkSession: SparkSession,
+ logicalPlan: LogicalPlan,
+ carbonTable: CarbonTable): Unit = {
+ IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan)
+ IUDCommonUtil.checkIsTranstionTable(carbonTable)
+ IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable)
+ IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.DELETE)
+ IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable)
+ }
+
+ def checkPreconditionsForUpdate(sparkSession: SparkSession,
+ logicalPlan: LogicalPlan,
+ carbonTable: CarbonTable,
+ columns: List[String]): Unit = {
+ IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, logicalPlan)
+ IUDCommonUtil.checkIsTranstionTable(carbonTable)
+ IUDCommonUtil.checkIfSpartialColumnsExists(carbonTable, columns)
+ IUDCommonUtil.checkIfColumnWithComplexTypeExists(carbonTable, columns)
+ IUDCommonUtil.checkIsHeterogeneousSegmentTable(carbonTable)
+ IUDCommonUtil.checkIsIndexedTable(carbonTable, TableOperation.UPDATE)
+ IUDCommonUtil.checkIsLoadInProgressInTable(carbonTable)
+ }
+
+ def checkIsHeterogeneousSegmentTable(carbonTable: CarbonTable): Unit = {
+ if (MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
+ throw new MalformedCarbonCommandException(
+ s"Unsupported operation on table containing mixed format segments")
+ }
+ }
+
+ def checkIsIndexedTable(carbonTable: CarbonTable, operation: TableOperation): Unit = {
+ if (!carbonTable.canAllow(carbonTable, operation)) {
+ throw new MalformedCarbonCommandException(
+ "update/delete operation is not supported for index")
+ }
+ }
+
+ def checkIsLoadInProgressInTable(carbonTable: CarbonTable): Unit = {
+ if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+ throw new ConcurrentOperationException(carbonTable, "loading", "data update")
+ }
+ }
+
+ def checkIsTranstionTable(carbonTable: CarbonTable): Unit = {
Review comment:
Transtion => Transaction
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]