[GitHub] [carbondata] akashrn5 commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

GitBox
akashrn5 commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r277220545
 
 

 ##########
 File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 ##########
 @@ -91,22 +102,198 @@ class MVDataMapProvider(
       val queryPlan = SparkSQLUtil.execute(
         sparkSession.sql(updatedQuery).queryExecution.analyzed,
         sparkSession).drop("preAgg")
-      val header = logicalPlan.output.map(_.name).mkString(",")
+      var isOverwriteTable = false
+      val isFullRebuild =
+        if (null != dataMapSchema.getProperties.get("full_refresh")) {
+          dataMapSchema.getProperties.get("full_refresh").toBoolean
+        } else {
+          false
+        }
+      if (isFullRebuild) {
+        isOverwriteTable = true
+      }
+      queryPlan.queryExecution.optimizedPlan transformDown {
+        case join@Join(l1, l2, jointype, condition) =>
+          // TODO: Support Incremental loading for multiple tables with join - CARBONDATA-3340
+          isOverwriteTable = true
+          join
+      }
+      if (!isOverwriteTable) {
+        // Set main table segments to load for incremental data loading.
+        // Compare main table segments info with datamap table segment map info and load only newly
+        // added segment from main table to datamap table
+        if (!setSegmentsBasedOnMapping()) {
+          return
+        }
+      }
+      val dataMapTable = CarbonTable
+        .buildFromTablePath(identifier.getTableName,
+          identifier.getDatabaseName,
+          identifier.getTablePath,
+          identifier.getTableId)
+      val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala
+        .filter { column =>
+          !column.getColumnName
+            .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+        }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
       val loadCommand = CarbonLoadDataCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         factPathFromUser = null,
         dimFilesPath = Seq(),
         options = scala.collection.immutable.Map("fileheader" -> header),
-        isOverwriteTable = true,
+        isOverwriteTable,
         inputSqlString = null,
         dataFrame = Some(queryPlan),
         updateModel = None,
         tableInfoOp = None,
         internalOptions = Map.empty,
         partition = Map.empty)
 
-      SparkSQLUtil.execute(loadCommand, sparkSession)
+      try {
+        SparkSQLUtil.execute(loadCommand, sparkSession)
+      } catch {
+        case ex: Exception =>
+          DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+          LOGGER.error("Data Load failed for DataMap: ", ex)
+      } finally {
+        unsetMainTableSegments()
+      }
+    }
+  }
+
+  /**
+   * This method will compare mainTable and dataMapTable segment List and loads only newly added
+   * segment from main table to dataMap table.
+   * In case if mainTable is compacted, then based on dataMap to mainTables segmentMapping, dataMap
+   * will be loaded
+   *
+   * Eg: Consider mainTableSegmentList: {0, 1, 2}, dataMapToMainTable segmentMap: { 0 -> 0, 1-> 1,2}
+   * If (1, 2) segments of main table are compacted to 1.1 and new segment (3) is loaded to
+   * main table, then mainTableSegmentList will be updated to{0, 1.1, 3}.
+   * In this case, segment (1) of dataMap table will be marked for delete, and new segment
+   * {2 -> 1.1, 3} will be loaded to dataMap table
 
 Review comment:
   add more cases in comments

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services