[GitHub] [carbondata] ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

GitBox
ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r278907341
 
 

 ##########
 File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 ##########
 @@ -91,22 +108,255 @@ class MVDataMapProvider(
       val queryPlan = SparkSQLUtil.execute(
         sparkSession.sql(updatedQuery).queryExecution.analyzed,
         sparkSession).drop("preAgg")
-      val header = logicalPlan.output.map(_.name).mkString(",")
+      var isOverwriteTable = false
+      val isFullRebuild =
+        if (null != dataMapSchema.getProperties.get("full_refresh")) {
+          dataMapSchema.getProperties.get("full_refresh").toBoolean
+        } else {
+          false
+        }
+      if (isFullRebuild) {
+        isOverwriteTable = true
+      }
+      queryPlan.queryExecution.optimizedPlan transformDown {
+        case join@Join(l1, l2, jointype, condition) =>
+          // TODO: Support Incremental loading for multiple tables with join - CARBONDATA-3340
+          isOverwriteTable = true
+          join
+      }
+      val dataMapTable = CarbonTable
+        .buildFromTablePath(identifier.getTableName,
+          identifier.getDatabaseName,
+          identifier.getTablePath,
+          identifier.getTableId)
+      if (dataMapSchema.isLazy) {
+        // check if rebuild to datamap is already in progress and throw exception
+        val loadMetaDataDetails = SegmentStatusManager
+          .readLoadMetadata(dataMapTable.getMetadataPath)
+        if (loadMetaDataDetails.nonEmpty) {
+          for (loadMetaDetail <- loadMetaDataDetails) {
+            if ((loadMetaDetail.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
+                 loadMetaDetail.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
+                SegmentStatusManager
+                  .isLoadInProgress(dataMapTable.getAbsoluteTableIdentifier,
+                    loadMetaDetail.getLoadName)) {
+              throw new RuntimeException(
+                "Rebuild to datamap " + dataMapSchema.getDataMapName + " is already in progress")
+            }
+          }
+        }
+      }
+      var segmentMap: String = ""
+      if (!isOverwriteTable) {
+        // Set main table segments to load for incremental data loading.
+        // Compare main table segments info with datamap table segment map info and load only newly
+        // added segment from main table to datamap table
+        val (isIncrementLoad, segmentMapping) = setSegmentsBasedOnMapping(dataMapTable
+          .getMetadataPath, dataMapTable.getAbsoluteTableIdentifier)
+        if (!isIncrementLoad) {
+          return
+        }
+        segmentMap = segmentMapping
+      } else {
+        val segmentMapping = new util.HashMap[String, java.util.List[String]]()
+        val relationIdentifiers = dataMapSchema.getParentTables.asScala
+        for (relationIdentifier <- relationIdentifiers) {
+          val mainTableSegmentList = getMainTableValidSegmentList(relationIdentifier)
+          segmentMapping.put(relationIdentifier.getTableName, mainTableSegmentList)
+        }
+        segmentMap = new Gson().toJson(segmentMapping)
+      }
+
+      val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala
+        .filter { column =>
+          !column.getColumnName
+            .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+        }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
       val loadCommand = CarbonLoadDataCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         factPathFromUser = null,
         dimFilesPath = Seq(),
         options = scala.collection.immutable.Map("fileheader" -> header),
-        isOverwriteTable = true,
+        isOverwriteTable,
         inputSqlString = null,
         dataFrame = Some(queryPlan),
         updateModel = None,
         tableInfoOp = None,
-        internalOptions = Map.empty,
+        internalOptions = Map(CarbonCommonConstants.DATAMAP_MAINTABLE_SEGMENTMAP -> segmentMap),
         partition = Map.empty)
 
-      SparkSQLUtil.execute(loadCommand, sparkSession)
+      try {
+        SparkSQLUtil.execute(loadCommand, sparkSession)
+      } catch {
+        case ex: Exception =>
+          DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+          LOGGER.error("Data Load failed for DataMap: ", ex)
+      } finally {
+        unsetMainTableSegments()
+      }
+    }
+  }
+
+  /**
+   * This method will compare mainTable and dataMapTable segment List and loads only newly added
+   * segment from main table to dataMap table.
+   * In case if mainTable is compacted, then based on dataMap to mainTables segmentMapping, dataMap
+   * will be loaded
+   *
+   * Eg: case 1: Consider mainTableSegmentList: {0, 1, 2}, dataMapToMainTable segmentMap:
+   * { 0 -> 0, 1-> 1,2}. If (1, 2) segments of main table are compacted to 1.1 and new segment (3)
+   * is loaded to main table, then mainTableSegmentList will be updated to{0, 1.1, 3}.
+   * In this case, segment (1) of dataMap table will be marked for delete, and new segment
+   * {2 -> 1.1, 3} will be loaded to dataMap table
+   *
+   * case 2: Consider mainTableSegmentList: {0, 1, 2, 3}, dataMapToMainTable segmentMap:
+   * { 0 -> 0,1,2, 1-> 3}. If (1, 2) segments of main table are compacted to 1.1 and new segment
+   * (4) is loaded to main table, then mainTableSegmentList will be updated to {0, 1.1, 3, 4}.
+   * In this case, segment (0) of dataMap table will be marked for delete and segment (0) of
+   * main table will be added to validSegmentList which needs to be loaded again. Now, new dataMap
+   * table segment (2) with main table segmentList{2 -> 1.1, 4, 0} will be loaded to dataMap table.
+   * dataMapToMainTable segmentMap will be updated to {1 -> 3, 2 -> 1.1, 4, 0} after rebuild
+   */
+  def setSegmentsBasedOnMapping(dataMapTableMetaDataPath: String,
+      dataMapTableAbsoluteTableIdentifier: AbsoluteTableIdentifier):
+  (Boolean, String) = {
+    val segmentMap = new util.HashMap[String, java.util.List[String]]()
+    val relationIdentifiers = dataMapSchema.getParentTables.asScala
+    val loadMetaDetails = SegmentStatusManager.readLoadMetadata(dataMapTableMetaDataPath)
+    if (loadMetaDetails.isEmpty) {
+      // If segment Map is empty, load all valid segments from main tables to dataMap
+      for (relationIdentifier <- relationIdentifiers) {
+        val mainTableSegmentList = getMainTableValidSegmentList(relationIdentifier)
+        // If mainTableSegmentList is empty, no need to trigger load command
+        // TODO: handle in case of multiple tables load to datamap table
+        if (mainTableSegmentList.isEmpty) {
+          return (false, "")
+        }
+        segmentMap.put(relationIdentifier.getTableName, mainTableSegmentList)
+        setSegmentsToLoadDataMap(relationIdentifier, mainTableSegmentList)
+      }
+    } else {
+      for (relationIdentifier <- relationIdentifiers) {
+        var dataMapTableSegmentList: java.util.List[String] = new util.ArrayList[String]()
+        for (loadMetaDetail <- loadMetaDetails) {
+          if (loadMetaDetail.getSegmentStatus == SegmentStatus.SUCCESS) {
+            val segmentMap: java.util.Map[String, java.util.List[String]] =
+              DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo)
+            dataMapTableSegmentList.addAll(segmentMap.get(relationIdentifier.getTableName))
+          }
+        }
+        val dataMapSegmentList: java.util.List[String] = new util.ArrayList[String]()
+        dataMapSegmentList.addAll(dataMapTableSegmentList)
+        // Get all segments for parent relationIdentifier
+        val mainTableSegmentList = getMainTableValidSegmentList(relationIdentifier)
+        dataMapTableSegmentList.removeAll(mainTableSegmentList)
+        mainTableSegmentList.removeAll(dataMapSegmentList)
+        if (mainTableSegmentList.isEmpty) {
+          return (false, "")
+        }
+        if (!dataMapTableSegmentList.isEmpty) {
+          val invalidMainTableSegmentList = new java.util.ArrayList[String]()
+          // validMainTableSegmentList holds segment list which needs to be loaded again
+          val validMainTableSegmentList = new java.util.HashSet[String]()
+          // invalidDataMapSegmentList holds segment list which needs to be marked for delete
+          val invalidDataMapSegmentList = new java.util.HashSet[String]()
+
+          // For dataMap segments which are not in main table segment list(if main table
+          // is compacted), iterate over those segments and get dataMap segments which needs to
+          // be marked for delete and main table segments which needs to be loaded again
+          dataMapTableSegmentList.asScala.foreach({ id =>
+            for (loadMetaDetail <- loadMetaDetails) {
+              if (loadMetaDetail.getSegmentStatus == SegmentStatus.SUCCESS) {
+                val segmentMap: java.util.Map[String, java.util.List[String]] =
+                  DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo)
+                val segments = segmentMap.get(relationIdentifier.getTableName)
+                if (segments.contains(id)) {
+                  segments.remove(id)
+                  validMainTableSegmentList.addAll(segments)
+                  invalidMainTableSegmentList.add(id)
+                  invalidDataMapSegmentList.add(loadMetaDetail.getLoadName)
+                }
+              }
+            }
+          })
+          var ifTableStatusNeedsToBeUpdated = false
+          // remove invalid segment from validMainTableSegmentList if present
+          validMainTableSegmentList.removeAll(invalidMainTableSegmentList)
+          // Remove invalid dataMap segment and update dataMap table status file
+          for (loadMetadataDetail <- loadMetaDetails) {
+            if (invalidDataMapSegmentList.contains(loadMetadataDetail.getLoadName)) {
+              ifTableStatusNeedsToBeUpdated = true
+              loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+            }
+          }
+          if (ifTableStatusNeedsToBeUpdated) {
+            val segmentStatusManager =
+              new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier)
+            val carbonLock: ICarbonLock = segmentStatusManager.getTableStatusLock
 
 Review comment:
   Include this method also in the caller lock and don't write multiple times.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services