[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

classic Classic list List threaded Threaded
99 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163765811
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -900,4 +904,75 @@ object PreAggregateUtil {
             aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
           CarbonCommonConstants.DEFAULT_CHARSET)
       }
    +
    +  def commitDataMaps(
    +      carbonLoadModel: CarbonLoadModel,
    +      uuid: String = "")(sparkSession: SparkSession) {
    +    val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
    +      .getDataMapSchemaList
    +    if (dataMapSchemas.size() >= 1) {
    --- End diff --
   
    make as `> 0`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163765845
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -900,4 +904,75 @@ object PreAggregateUtil {
             aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
           CarbonCommonConstants.DEFAULT_CHARSET)
       }
    +
    +  def commitDataMaps(
    +      carbonLoadModel: CarbonLoadModel,
    +      uuid: String = "")(sparkSession: SparkSession) {
    +    val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
    +      .getDataMapSchemaList
    +    if (dataMapSchemas.size() >= 1) {
    +      val renamedDataMaps = dataMapSchemas.asScala.flatMap {
    +        dataMapSchema =>
    --- End diff --
   
    move to above line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163767814
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -900,4 +904,75 @@ object PreAggregateUtil {
             aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
           CarbonCommonConstants.DEFAULT_CHARSET)
       }
    +
    +  def commitDataMaps(
    +      carbonLoadModel: CarbonLoadModel,
    +      uuid: String = "")(sparkSession: SparkSession) {
    +    val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
    +      .getDataMapSchemaList
    +    if (dataMapSchemas.size() >= 1) {
    +      val renamedDataMaps = dataMapSchemas.asScala.flatMap {
    +        dataMapSchema =>
    +          LOGGER.info(
    +            s"Renaming table status file for ${ dataMapSchema.getRelationIdentifier.toString }")
    +          val carbonTable = CarbonEnv
    +            .getCarbonTable(Option(dataMapSchema.getRelationIdentifier.getDatabaseName),
    +              dataMapSchema.getRelationIdentifier.getTableName)(sparkSession)
    +          val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
    +            carbonTable.getTablePath)
    +          val oldTableSchemaPath = carbonTablePath.getTableStatusFilePath(uuid)
    +          val newTableSchemaPath = carbonTablePath.getTableStatusFilePath()
    +          if (renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)) {
    +            Some(carbonTable)
    +          } else {
    +            None
    +          }
    +      }
    +      if (renamedDataMaps.lengthCompare(dataMapSchemas.size()) < 0) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          carbonTable =>
    +            val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
    +              carbonTable.getTablePath)
    +            val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath() + "_backup_" + uuid
    +            val tableSchemaPath = carbonTablePath.getTableStatusFilePath()
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +      cleanUpStaleTableStatusFiles(renamedDataMaps, uuid)(sparkSession)
    +    }
    +  }
    +
    +  private def renameDataMapTableStatusFiles(oldTableSchemaPath: String,
    +      newTableSchemaPath: String, uuid: String) = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath)
    +    val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath)
    +    val backupCreated = if (newCarbonFile.exists()) {
    +      newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid)
    +    } else {
    +      true
    +    }
    +    if (oldCarbonFile.exists() && backupCreated) {
    +      oldCarbonFile.renameForce(newTableSchemaPath)
    +    } else {
    +      false
    +    }
    +  }
    +
    --- End diff --
   
    Better add the code as below
   
    ```
     private def renameDataMapTableStatusFiles(oldTableSchemaPath: String,
          newTableSchemaPath: String, uuid: String) = {
        val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath)
        val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath)
        if (newCarbonFile.exists() && oldCarbonFile.exists()) {
          if (newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid)) {
            oldCarbonFile.renameForce(newTableSchemaPath)
          } else {
            false
          }
        } else {
          false
        }
      }
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163768054
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -900,4 +904,75 @@ object PreAggregateUtil {
             aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
           CarbonCommonConstants.DEFAULT_CHARSET)
       }
    +
    +  def commitDataMaps(
    +      carbonLoadModel: CarbonLoadModel,
    +      uuid: String = "")(sparkSession: SparkSession) {
    +    val dataMapSchemas = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
    +      .getDataMapSchemaList
    +    if (dataMapSchemas.size() >= 1) {
    +      val renamedDataMaps = dataMapSchemas.asScala.flatMap {
    +        dataMapSchema =>
    +          LOGGER.info(
    +            s"Renaming table status file for ${ dataMapSchema.getRelationIdentifier.toString }")
    +          val carbonTable = CarbonEnv
    +            .getCarbonTable(Option(dataMapSchema.getRelationIdentifier.getDatabaseName),
    +              dataMapSchema.getRelationIdentifier.getTableName)(sparkSession)
    +          val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
    +            carbonTable.getTablePath)
    +          val oldTableSchemaPath = carbonTablePath.getTableStatusFilePath(uuid)
    +          val newTableSchemaPath = carbonTablePath.getTableStatusFilePath()
    +          if (renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)) {
    +            Some(carbonTable)
    +          } else {
    +            None
    +          }
    +      }
    +      if (renamedDataMaps.lengthCompare(dataMapSchemas.size()) < 0) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          carbonTable =>
    +            val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
    +              carbonTable.getTablePath)
    +            val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath() + "_backup_" + uuid
    +            val tableSchemaPath = carbonTablePath.getTableStatusFilePath()
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +      cleanUpStaleTableStatusFiles(renamedDataMaps, uuid)(sparkSession)
    +    }
    +  }
    +
    +  private def renameDataMapTableStatusFiles(oldTableSchemaPath: String,
    +      newTableSchemaPath: String, uuid: String) = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath)
    +    val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath)
    +    val backupCreated = if (newCarbonFile.exists()) {
    +      newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid)
    +    } else {
    +      true
    +    }
    +    if (oldCarbonFile.exists() && backupCreated) {
    +      oldCarbonFile.renameForce(newTableSchemaPath)
    +    } else {
    +      false
    +    }
    +  }
    +
    +  def cleanUpStaleTableStatusFiles(carbonTables: Seq[CarbonTable],
    +      uuid: String)(sparkSession: SparkSession): Unit = {
    +    carbonTables.foreach {
    --- End diff --
   
    Delete the created segment folder as well


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163768422
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -541,9 +543,11 @@ object CarbonDataRDDFactory {
               carbonLoadModel,
               loadStatus,
               newEntryLoadStatus,
    -          overwriteTable)
    +          overwriteTable,
    +          uniqueTableStatusId)
    +      PreAggregateUtil.commitDataMaps(carbonLoadModel, uniqueTableStatusId)(sqlContext.sparkSession)
    --- End diff --
   
    Use try catch and make status fail in case of any issue while committing the datamap


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163768485
 
    --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
    @@ -208,6 +207,8 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
         parentState: Option[SessionState] = None)
       extends HiveSessionStateBuilder(sparkSession, parentState) {
     
    +  CarbonSession.initListeners()
    --- End diff --
   
    remove it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1781: [CARBONDATA-2012] Add support to load pre-agg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163768617
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -148,7 +148,7 @@ public static void deleteStorePath(String path) {
        * @throws IOException
        */
       public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
    -      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
    +      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
    --- End diff --
   
    Add a new method


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1887/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3119/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1890/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3123/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3127/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1894/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3098/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3101/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1905/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3138/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3105/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3111/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1781: [CARBONDATA-2012] Add support to load pre-aggregate ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1781
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1980/



---
12345