[GitHub] carbondata pull request #2109: [WIP] Partition preaggregate support

classic Classic list List threaded Threaded
115 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178758267
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         var locks = List.empty[ICarbonLock]
         val uniqueId = System.currentTimeMillis().toString
    +    val childCommands = operationContext.getProperty("dropPartitionCommands")
    --- End diff --
   
    Remove as unused


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178761202
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand(
             table.getDatabaseName,
             table.getTableName,
             locksToBeAcquired)(sparkSession)
    +      // If flow is for child table then get the uuid from operation context.
    +      // If flow is for parent table then generate uuid for child flows and set the uuid to ""
    +      // for parent table
    +      // If normal table then set uuid to "".
    +      val uuid = if (table.isChildDataMap) {
    +        Option(operationContext.getProperty("uuid")).getOrElse("").toString
    --- End diff --
   
    In case of isChildDataMap condition it is mandatory to get UUID set by parent table. If not set then log the message here or better to throw the exception from here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178719972
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---
    @@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with
         segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
       }
     
    +  test("test auto compaction on aggregate table") {
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
    +    segmentNamesSum.sorted should equal  (Array("0", "0.1", "1", "2", "3"))
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
    --- End diff --
   
    1. Wrap the test case in try/finally...In case of any failure the property need to be set to default value again else proceeding test cases might fail
    2. Replace false with default value from CarbonCommonConstants


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178719497
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -130,15 +130,20 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws
               loadModel.getTablePath());
           newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
         }
    +    OperationContext operationContext = (OperationContext) getOperationContext();
    +    String uuid = "";
    +    if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() &&
    +        operationContext != null) {
    +      uuid = operationContext.getProperty("uuid").toString();
    +    }
         CarbonLoaderUtil
             .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
                 true);
         CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
         long segmentSize = CarbonLoaderUtil
             .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
         if (segmentSize > 0 || overwriteSet) {
    -      Object operationContext = getOperationContext();
    -      if (operationContext != null) {
    +      if (operationContext != null && carbonTable.hasAggregationDataMap()) {
             ((OperationContext) operationContext)
    --- End diff --
   
    Remove typecasting again


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178727711
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,488 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    +
    +  val testData = s"$resourcesPath/sample.csv"
    +
    +  override def beforeAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("create database partition_preaggregate")
    +    sql("use partition_preaggregate")
    +    sql(
    +      """
    +        | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(
    +      """
    +        | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +  }
    +
    +  override def afterAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("use default")
    +  }
    +
    +  // Create aggregate table on partition with partition column in aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as aggregation") {
    +    sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column in projection and aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as projection") {
    +    sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column as group by.
    +  test("test preaggregate table creation on partition table with partition col as group by") {
    +    sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition without partition column.
    +  test("test preaggregate table creation on partition table without partition column") {
    +    sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +  
    +  test("test data correction in aggregate table when partition column is used") {
    +    sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age")
    +    checkAnswer(sql("select * from maintable_p1"),
    +      Seq(Row(1,31,31),
    +        Row(2,27,27),
    +        Row(3,70,35),
    +        Row(4,26,26),
    +        Row(4,29,29)))
    +    preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1")
    +    sql("drop datamap p1 on table maintable")
    +  }
    +
    +  test("test data correction in aggregate table when partition column is not used") {
    +    sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id")
    +    checkAnswer(sql("select * from maintable_p2"),
    +      Seq(Row(1,31),
    +        Row(2,27),
    +        Row(3,35),
    +        Row(4,29)))
    +    preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2")
    +    sql("drop datamap p2 on table maintable")
    +  }
    +
    +  test("test data correction with insert overwrite") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2014,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
    +    checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1)))
    +  }
    +
    +  test("test data correction with insert overwrite on different value") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2015,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1)))
    --- End diff --
   
    after insert overwrite operation only one row should come (Row("v",2015,1,1))....Is my understanding correct?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178767625
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    --- End diff --
   
    Write a comment to explain when uuid will be empty and when it will be non empty


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178780592
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    --- End diff --
   
    Make it as info or error


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178787300
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -62,8 +63,25 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val parentPartitionColumns = if (parentTable.isHivePartitionTable) {
    +      partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +    } else {
    +      Seq()
    +    }
    +    // Generate child table partition columns in the same order as the parent table.
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if parentPartitionColumns
    +        .exists(parentCol =>
    +          parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +        (PartitionerField(field.name.get,
    +          field.dataType,
    +          field.columnComment), parentPartitionColumns
    +          .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
    +    }.toSeq.sortBy(_._2).map(_._1)
    --- End diff --
   
    I think sortBy and map operation is not required here as already the child datamap ordering is according to the parent table partition column ordering


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178784636
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    +        operationContext,
    +        uuid)
    +      if (commitFailed) {
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +
    +      }
    +    }
    +}
    +
    +object AlterTableDropPartitionMetaListener extends OperationEventListener{
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
    +    val parentCarbonTable = dropPartitionEvent.parentCarbonTable
    +    val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
    +    val sparkSession = SparkSession.getActiveSession.get
    +    if (parentCarbonTable.hasAggregationDataMap) {
    +      // used as a flag to block direct drop partition on aggregate tables fired by the user
    +      operationContext.setProperty("isInternalDropCall", "true")
    +      // Filter out all the tables which dont have the partition being dropped.
    +      val childTablesWithoutPartitionColumns =
    +        parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
    +          val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
    +          val partitionColExists = partitionsToBeDropped.forall {
    +            partition =>
    +              childColumns.exists { childColumn =>
    +                  childColumn.getAggFunction.isEmpty &&
    +                  childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
    +                    equals(partition)
    +              }
    +          }
    +          !partitionColExists
    +      }
    +      if (childTablesWithoutPartitionColumns.nonEmpty) {
    +        throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
    +                                           s" participating in the following datamaps ${
    +          childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
    --- End diff --
   
    Make sure this list is printed to console output


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178775449
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    --- End diff --
   
    1. Make this call in finally block
    2. We need to have a mechanism for clean up during clean operation. **For this we can raise a jira to track the issue**


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178795973
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,488 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    +
    +  val testData = s"$resourcesPath/sample.csv"
    +
    +  override def beforeAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("create database partition_preaggregate")
    +    sql("use partition_preaggregate")
    +    sql(
    +      """
    +        | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(
    +      """
    +        | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +  }
    +
    +  override def afterAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("use default")
    +  }
    +
    +  // Create aggregate table on partition with partition column in aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as aggregation") {
    +    sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column in projection and aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as projection") {
    +    sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column as group by.
    +  test("test preaggregate table creation on partition table with partition col as group by") {
    +    sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition without partition column.
    +  test("test preaggregate table creation on partition table without partition column") {
    +    sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +  
    +  test("test data correction in aggregate table when partition column is used") {
    +    sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age")
    +    checkAnswer(sql("select * from maintable_p1"),
    +      Seq(Row(1,31,31),
    +        Row(2,27,27),
    +        Row(3,70,35),
    +        Row(4,26,26),
    +        Row(4,29,29)))
    +    preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1")
    +    sql("drop datamap p1 on table maintable")
    +  }
    +
    +  test("test data correction in aggregate table when partition column is not used") {
    +    sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id")
    +    checkAnswer(sql("select * from maintable_p2"),
    +      Seq(Row(1,31),
    +        Row(2,27),
    +        Row(3,35),
    +        Row(4,29)))
    +    preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2")
    +    sql("drop datamap p2 on table maintable")
    +  }
    +
    +  test("test data correction with insert overwrite") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2014,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
    +    checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1)))
    +  }
    +
    +  test("test data correction with insert overwrite on different value") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2015,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1)))
    --- End diff --
   
    In case of partition, overwrite would on per partition level not on table level. So on the matching partitions would be overwritten.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801685
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -62,8 +63,25 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val parentPartitionColumns = if (parentTable.isHivePartitionTable) {
    +      partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +    } else {
    +      Seq()
    +    }
    +    // Generate child table partition columns in the same order as the parent table.
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if parentPartitionColumns
    +        .exists(parentCol =>
    +          parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +        (PartitionerField(field.name.get,
    +          field.dataType,
    +          field.columnComment), parentPartitionColumns
    +          .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
    +    }.toSeq.sortBy(_._2).map(_._1)
    --- End diff --
   
    removed sortBy


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801725
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    +        operationContext,
    +        uuid)
    +      if (commitFailed) {
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +
    +      }
    +    }
    +}
    +
    +object AlterTableDropPartitionMetaListener extends OperationEventListener{
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
    +    val parentCarbonTable = dropPartitionEvent.parentCarbonTable
    +    val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
    +    val sparkSession = SparkSession.getActiveSession.get
    +    if (parentCarbonTable.hasAggregationDataMap) {
    +      // used as a flag to block direct drop partition on aggregate tables fired by the user
    +      operationContext.setProperty("isInternalDropCall", "true")
    +      // Filter out all the tables which dont have the partition being dropped.
    +      val childTablesWithoutPartitionColumns =
    +        parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
    +          val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
    +          val partitionColExists = partitionsToBeDropped.forall {
    +            partition =>
    +              childColumns.exists { childColumn =>
    +                  childColumn.getAggFunction.isEmpty &&
    +                  childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
    +                    equals(partition)
    +              }
    +          }
    +          !partitionColExists
    +      }
    +      if (childTablesWithoutPartitionColumns.nonEmpty) {
    +        throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
    +                                           s" participating in the following datamaps ${
    +          childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
    --- End diff --
   
    Verified


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801747
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    --- End diff --
   
    done



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801779
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    --- End diff --
   
    Added TODO


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801803
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    --- End diff --
   
    done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801858
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand(
             table.getDatabaseName,
             table.getTableName,
             locksToBeAcquired)(sparkSession)
    +      // If flow is for child table then get the uuid from operation context.
    +      // If flow is for parent table then generate uuid for child flows and set the uuid to ""
    +      // for parent table
    +      // If normal table then set uuid to "".
    +      val uuid = if (table.isChildDataMap) {
    +        Option(operationContext.getProperty("uuid")).getOrElse("").toString
    --- End diff --
   
    Logged the message


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801883
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         var locks = List.empty[ICarbonLock]
         val uniqueId = System.currentTimeMillis().toString
    +    val childCommands = operationContext.getProperty("dropPartitionCommands")
    --- End diff --
   
    done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801930
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---
    @@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with
         segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
       }
     
    +  test("test auto compaction on aggregate table") {
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
    +    segmentNamesSum.sorted should equal  (Array("0", "0.1", "1", "2", "3"))
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
    --- End diff --
   
    added in afterAll


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3565/



---
123456