ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r279248051 ########## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala ########## @@ -91,29 +115,280 @@ class MVDataMapProvider( val queryPlan = SparkSQLUtil.execute( sparkSession.sql(updatedQuery).queryExecution.analyzed, sparkSession).drop("preAgg") - val header = logicalPlan.output.map(_.name).mkString(",") + var isOverwriteTable = false + val isFullRefresh = + if (null != dataMapSchema.getProperties.get("full_refresh")) { + dataMapSchema.getProperties.get("full_refresh").toBoolean + } else { + false + } + if (isFullRefresh) { + isOverwriteTable = true + } + val dataMapTable = CarbonTable + .buildFromTablePath(identifier.getTableName, + identifier.getDatabaseName, + identifier.getTablePath, + identifier.getTableId) + // Get new load for datamap by loading specified main table segments + val newDataMapLoad = incrementalBuild(isOverwriteTable, + dataMapTable.getAbsoluteTableIdentifier, + dataMapTable.getMetadataPath) + if(newDataMapLoad.isEmpty) { + return false + } + val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala + .filter { column => + !column.getColumnName + .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) + }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") val loadCommand = CarbonLoadDataCommand( databaseNameOp = Some(identifier.getDatabaseName), tableName = identifier.getTableName, factPathFromUser = null, dimFilesPath = Seq(), options = scala.collection.immutable.Map("fileheader" -> header), - isOverwriteTable = true, + isOverwriteTable, inputSqlString = null, dataFrame = Some(queryPlan), updateModel = None, tableInfoOp = None, - internalOptions = Map.empty, + internalOptions = Map("mergedSegmentName" -> newDataMapLoad), partition = Map.empty) - SparkSQLUtil.execute(loadCommand, sparkSession) + try { + SparkSQLUtil.execute(loadCommand, sparkSession) + } catch { + case ex: Exception => + DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) + LOGGER.error("Data Load failed for DataMap: ", ex) + return false + } finally { + unsetMainTableSegments() + } } + true } + /** + * This method builds new load for datamap by loading specified segment data and returns + * new load name for datamap table load + */ @throws[IOException] - override def incrementalBuild( - segmentIds: Array[String]): Unit = { - throw new UnsupportedOperationException + override def incrementalBuild(isOverwriteTable: Boolean, + dataMapTableAbsoluteTableIdentifier: AbsoluteTableIdentifier, + dataMapTableMetadataPath: String): String = { + var loadMetaDataDetails = SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath) + var newLoadName : String = "" + var segmentMap: String = "" + val segmentStatusManager = new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier) + // Acquire table status lock to handle concurrent dataloading + val carbonLock: ICarbonLock = segmentStatusManager.getTableStatusLock Review comment: Please move it to the parent class and add abstract method rebuildInteral(String loadName, Map <String, Set<segments >>) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |