[GitHub] carbondata pull request #2109: [WIP] Partition preaggregate support

classic Classic list List threaded Threaded
115 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178217935
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
    @@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
       override def executeCompaction(): Unit = {
         val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
         val loadMetaDataDetails = identifySegmentsToBeMerged()
    -    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    // If segmentFile name is specified in load details then segment is for partition table
    +    // therefore the segment file name should be loadName#segmentFileName.segment
    +    val segments = loadMetaDataDetails.asScala.map {
    +      loadDetail => if (loadDetail.getSegmentFile != null) {
    +        loadDetail.getLoadName + "#" + loadDetail.getSegmentFile
    +      } else {
    +        loadDetail.getLoadName
    +      }
    --- End diff --
   
    Just use new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile).toString instead of doing this


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178218263
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -149,7 +143,16 @@ case class CarbonLoadDataCommand(
         val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
         carbonProperty.addProperty("zookeeper.enable.lock", "false")
    -
    +    currPartitions = if (table.isHivePartitionTable) {
    --- End diff --
   
    This is a metastore call, not supposed keep under processData


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178219012
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.spark.sql.execution.command.partition
     
     import java.util
    +import java.util.UUID
    --- End diff --
   
    Do you allow dropping of partitions directly on aggregate table? I mean not through parent table.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178219637
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    --- End diff --
   
    just use parentTable.isHivePartitionTable


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178222428
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    --- End diff --
   
    Why again require to extract the columns? Can't you get from fieldRelationMap


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178222584
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    --- End diff --
   
    format it properly


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178222846
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if childPartitionColumns
    +        .exists(childCol =>
    +          childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +      PartitionerField(field.name.get, field.dataType, field.columnComment)
    +    }.toSeq
    --- End diff --
   
    The order of partition columns is important. Please make sure that parent partition column order and child partition column order is same.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178223172
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if childPartitionColumns
    +        .exists(childCol =>
    +          childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +      PartitionerField(field.name.get, field.dataType, field.columnComment)
    +    }.toSeq
    --- End diff --
   
    PLease add some tests to make sure the order of partition columns are same as parent partition columns


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178237773
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.spark.sql.execution.command.partition
     
     import java.util
    +import java.util.UUID
    --- End diff --
   
    Not allowed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178237981
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -149,7 +143,16 @@ case class CarbonLoadDataCommand(
         val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
         carbonProperty.addProperty("zookeeper.enable.lock", "false")
    -
    +    currPartitions = if (table.isHivePartitionTable) {
    --- End diff --
   
    Actually i had to move getPartitions to processData because when this method is called for child tables then we are expecting validSegments to be set. In processMeta they are not set. I have passed carbonTable to avoid metastore call. Should not cause problems.c


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178238607
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
    @@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
       override def executeCompaction(): Unit = {
         val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
         val loadMetaDataDetails = identifySegmentsToBeMerged()
    -    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    // If segmentFile name is specified in load details then segment is for partition table
    +    // therefore the segment file name should be loadName#segmentFileName.segment
    +    val segments = loadMetaDataDetails.asScala.map {
    +      loadDetail => if (loadDetail.getSegmentFile != null) {
    +        loadDetail.getLoadName + "#" + loadDetail.getSegmentFile
    +      } else {
    +        loadDetail.getLoadName
    +      }
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178239696
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    --- End diff --
   
    removed this method call


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178239706
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    --- End diff --
   
    done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178264718
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if childPartitionColumns
    +        .exists(childCol =>
    +          childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +      PartitionerField(field.name.get, field.dataType, field.columnComment)
    +    }.toSeq
    --- End diff --
   
    done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178264753
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,440 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import java.io.{File, FileWriter}
    +
    +import org.apache.commons.io.FileUtils
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    --- End diff --
   
    added insert overwrite test case


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4685/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3458/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4193/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4686/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3459/



---
123456