Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163765811 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -900,4 +904,75 @@ object PreAggregateUtil { aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), CarbonCommonConstants.DEFAULT_CHARSET) } + + def commitDataMaps( + carbonLoadModel: CarbonLoadModel, + uuid: String = "")(sparkSession: SparkSession) { + val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo + .getDataMapSchemaList + if (dataMapSchemas.size() >= 1) { --- End diff -- make as `> 0` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163765845 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -900,4 +904,75 @@ object PreAggregateUtil { aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), CarbonCommonConstants.DEFAULT_CHARSET) } + + def commitDataMaps( + carbonLoadModel: CarbonLoadModel, + uuid: String = "")(sparkSession: SparkSession) { + val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo + .getDataMapSchemaList + if (dataMapSchemas.size() >= 1) { + val renamedDataMaps = dataMapSchemas.asScala.flatMap { + dataMapSchema => --- End diff -- move to above line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163767814 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -900,4 +904,75 @@ object PreAggregateUtil { aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), CarbonCommonConstants.DEFAULT_CHARSET) } + + def commitDataMaps( + carbonLoadModel: CarbonLoadModel, + uuid: String = "")(sparkSession: SparkSession) { + val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo + .getDataMapSchemaList + if (dataMapSchemas.size() >= 1) { + val renamedDataMaps = dataMapSchemas.asScala.flatMap { + dataMapSchema => + LOGGER.info( + s"Renaming table status file for ${ dataMapSchema.getRelationIdentifier.toString }") + val carbonTable = CarbonEnv + .getCarbonTable(Option(dataMapSchema.getRelationIdentifier.getDatabaseName), + dataMapSchema.getRelationIdentifier.getTableName)(sparkSession) + val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val oldTableSchemaPath = carbonTablePath.getTableStatusFilePath(uuid) + val newTableSchemaPath = carbonTablePath.getTableStatusFilePath() + if (renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)) { + Some(carbonTable) + } else { + None + } + } + if (renamedDataMaps.lengthCompare(dataMapSchemas.size()) < 0) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + carbonTable => + val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath() + "_backup_" + uuid + val tableSchemaPath = carbonTablePath.getTableStatusFilePath() + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + sys.error("Failed to update table status for pre-aggregate table") + } + cleanUpStaleTableStatusFiles(renamedDataMaps, uuid)(sparkSession) + } + } + + private def renameDataMapTableStatusFiles(oldTableSchemaPath: String, + newTableSchemaPath: String, uuid: String) = { + val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath) + val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath) + val backupCreated = if (newCarbonFile.exists()) { + newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid) + } else { + true + } + if (oldCarbonFile.exists() && backupCreated) { + oldCarbonFile.renameForce(newTableSchemaPath) + } else { + false + } + } + --- End diff -- Better add the code as below ``` private def renameDataMapTableStatusFiles(oldTableSchemaPath: String, newTableSchemaPath: String, uuid: String) = { val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath) val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath) if (newCarbonFile.exists() && oldCarbonFile.exists()) { if (newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid)) { oldCarbonFile.renameForce(newTableSchemaPath) } else { false } } else { false } } ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163768054 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -900,4 +904,75 @@ object PreAggregateUtil { aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), CarbonCommonConstants.DEFAULT_CHARSET) } + + def commitDataMaps( + carbonLoadModel: CarbonLoadModel, + uuid: String = "")(sparkSession: SparkSession) { + val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo + .getDataMapSchemaList + if (dataMapSchemas.size() >= 1) { + val renamedDataMaps = dataMapSchemas.asScala.flatMap { + dataMapSchema => + LOGGER.info( + s"Renaming table status file for ${ dataMapSchema.getRelationIdentifier.toString }") + val carbonTable = CarbonEnv + .getCarbonTable(Option(dataMapSchema.getRelationIdentifier.getDatabaseName), + dataMapSchema.getRelationIdentifier.getTableName)(sparkSession) + val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val oldTableSchemaPath = carbonTablePath.getTableStatusFilePath(uuid) + val newTableSchemaPath = carbonTablePath.getTableStatusFilePath() + if (renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)) { + Some(carbonTable) + } else { + None + } + } + if (renamedDataMaps.lengthCompare(dataMapSchemas.size()) < 0) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + carbonTable => + val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath() + "_backup_" + uuid + val tableSchemaPath = carbonTablePath.getTableStatusFilePath() + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + sys.error("Failed to update table status for pre-aggregate table") + } + cleanUpStaleTableStatusFiles(renamedDataMaps, uuid)(sparkSession) + } + } + + private def renameDataMapTableStatusFiles(oldTableSchemaPath: String, + newTableSchemaPath: String, uuid: String) = { + val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath) + val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath) + val backupCreated = if (newCarbonFile.exists()) { + newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid) + } else { + true + } + if (oldCarbonFile.exists() && backupCreated) { + oldCarbonFile.renameForce(newTableSchemaPath) + } else { + false + } + } + + def cleanUpStaleTableStatusFiles(carbonTables: Seq[CarbonTable], + uuid: String)(sparkSession: SparkSession): Unit = { + carbonTables.foreach { --- End diff -- Delete the created segment folder as well --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163768422 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -541,9 +543,11 @@ object CarbonDataRDDFactory { carbonLoadModel, loadStatus, newEntryLoadStatus, - overwriteTable) + overwriteTable, + uniqueTableStatusId) + PreAggregateUtil.commitDataMaps(carbonLoadModel, uniqueTableStatusId)(sqlContext.sparkSession) --- End diff -- Use try catch and make status fail in case of any issue while committing the datamap --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163768485 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -208,6 +207,8 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None) extends HiveSessionStateBuilder(sparkSession, parentState) { + CarbonSession.initListeners() --- End diff -- remove it --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163768617 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java --- @@ -148,7 +148,7 @@ public static void deleteStorePath(String path) { * @throws IOException */ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, - CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite) + CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid) --- End diff -- Add a new method --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1887/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3119/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1890/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3123/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3127/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1894/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1781 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3098/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1781 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3101/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1905/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3138/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1781 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3105/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1781 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3111/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1781 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1980/ --- |
Free forum by Nabble | Edit this page |