ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r279247186 ########## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala ########## @@ -91,29 +115,280 @@ class MVDataMapProvider( val queryPlan = SparkSQLUtil.execute( sparkSession.sql(updatedQuery).queryExecution.analyzed, sparkSession).drop("preAgg") - val header = logicalPlan.output.map(_.name).mkString(",") + var isOverwriteTable = false + val isFullRefresh = + if (null != dataMapSchema.getProperties.get("full_refresh")) { + dataMapSchema.getProperties.get("full_refresh").toBoolean + } else { + false + } + if (isFullRefresh) { + isOverwriteTable = true + } + val dataMapTable = CarbonTable + .buildFromTablePath(identifier.getTableName, + identifier.getDatabaseName, + identifier.getTablePath, + identifier.getTableId) + // Get new load for datamap by loading specified main table segments + val newDataMapLoad = incrementalBuild(isOverwriteTable, + dataMapTable.getAbsoluteTableIdentifier, + dataMapTable.getMetadataPath) + if(newDataMapLoad.isEmpty) { + return false + } + val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala + .filter { column => + !column.getColumnName + .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) + }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") val loadCommand = CarbonLoadDataCommand( databaseNameOp = Some(identifier.getDatabaseName), tableName = identifier.getTableName, factPathFromUser = null, dimFilesPath = Seq(), options = scala.collection.immutable.Map("fileheader" -> header), - isOverwriteTable = true, + isOverwriteTable, inputSqlString = null, dataFrame = Some(queryPlan), updateModel = None, tableInfoOp = None, - internalOptions = Map.empty, + internalOptions = Map("mergedSegmentName" -> newDataMapLoad), partition = Map.empty) - SparkSQLUtil.execute(loadCommand, sparkSession) + try { + SparkSQLUtil.execute(loadCommand, sparkSession) + } catch { + case ex: Exception => + DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) + LOGGER.error("Data Load failed for DataMap: ", ex) + return false + } finally { + unsetMainTableSegments() + } } + true } + /** + * This method builds new load for datamap by loading specified segment data and returns + * new load name for datamap table load + */ @throws[IOException] - override def incrementalBuild( - segmentIds: Array[String]): Unit = { - throw new UnsupportedOperationException + override def incrementalBuild(isOverwriteTable: Boolean, Review comment: PLease remove from interface as it is doing nothing ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |