akashrn5 commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r277220545 ########## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala ########## @@ -91,22 +102,198 @@ class MVDataMapProvider( val queryPlan = SparkSQLUtil.execute( sparkSession.sql(updatedQuery).queryExecution.analyzed, sparkSession).drop("preAgg") - val header = logicalPlan.output.map(_.name).mkString(",") + var isOverwriteTable = false + val isFullRebuild = + if (null != dataMapSchema.getProperties.get("full_refresh")) { + dataMapSchema.getProperties.get("full_refresh").toBoolean + } else { + false + } + if (isFullRebuild) { + isOverwriteTable = true + } + queryPlan.queryExecution.optimizedPlan transformDown { + case join@Join(l1, l2, jointype, condition) => + // TODO: Support Incremental loading for multiple tables with join - CARBONDATA-3340 + isOverwriteTable = true + join + } + if (!isOverwriteTable) { + // Set main table segments to load for incremental data loading. + // Compare main table segments info with datamap table segment map info and load only newly + // added segment from main table to datamap table + if (!setSegmentsBasedOnMapping()) { + return + } + } + val dataMapTable = CarbonTable + .buildFromTablePath(identifier.getTableName, + identifier.getDatabaseName, + identifier.getTablePath, + identifier.getTableId) + val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala + .filter { column => + !column.getColumnName + .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) + }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") val loadCommand = CarbonLoadDataCommand( databaseNameOp = Some(identifier.getDatabaseName), tableName = identifier.getTableName, factPathFromUser = null, dimFilesPath = Seq(), options = scala.collection.immutable.Map("fileheader" -> header), - isOverwriteTable = true, + isOverwriteTable, inputSqlString = null, dataFrame = Some(queryPlan), updateModel = None, tableInfoOp = None, internalOptions = Map.empty, partition = Map.empty) - SparkSQLUtil.execute(loadCommand, sparkSession) + try { + SparkSQLUtil.execute(loadCommand, sparkSession) + } catch { + case ex: Exception => + DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) + LOGGER.error("Data Load failed for DataMap: ", ex) + } finally { + unsetMainTableSegments() + } + } + } + + /** + * This method will compare mainTable and dataMapTable segment List and loads only newly added + * segment from main table to dataMap table. + * In case if mainTable is compacted, then based on dataMap to mainTables segmentMapping, dataMap + * will be loaded + * + * Eg: Consider mainTableSegmentList: {0, 1, 2}, dataMapToMainTable segmentMap: { 0 -> 0, 1-> 1,2} + * If (1, 2) segments of main table are compacted to 1.1 and new segment (3) is loaded to + * main table, then mainTableSegmentList will be updated to{0, 1.1, 3}. + * In this case, segment (1) of dataMap table will be marked for delete, and new segment + * {2 -> 1.1, 3} will be loaded to dataMap table Review comment: add more cases in comments ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |