Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178217935 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala --- @@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, override def executeCompaction(): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val loadMetaDataDetails = identifySegmentsToBeMerged() - val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + // If segmentFile name is specified in load details then segment is for partition table + // therefore the segment file name should be loadName#segmentFileName.segment + val segments = loadMetaDataDetails.asScala.map { + loadDetail => if (loadDetail.getSegmentFile != null) { + loadDetail.getLoadName + "#" + loadDetail.getSegmentFile + } else { + loadDetail.getLoadName + } --- End diff -- Just use new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile).toString instead of doing this --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178218263 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -149,7 +143,16 @@ case class CarbonLoadDataCommand( val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") - + currPartitions = if (table.isHivePartitionTable) { --- End diff -- This is a metastore call, not supposed keep under processData --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178219012 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala --- @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command.partition import java.util +import java.util.UUID --- End diff -- Do you allow dropping of partitions directly on aggregate table? I mean not through parent table. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178219637 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && --- End diff -- just use parentTable.isHivePartitionTable --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178222428 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) --- End diff -- Why again require to extract the columns? Can't you get from fieldRelationMap --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178222584 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) + } else {Seq()} --- End diff -- format it properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178222846 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) + } else {Seq()} + val partitionerFields = fieldRelationMap.collect { + case (field, dataMapField) if childPartitionColumns + .exists(childCol => + childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && + dataMapField.aggregateFunction.isEmpty) => + PartitionerField(field.name.get, field.dataType, field.columnComment) + }.toSeq --- End diff -- The order of partition columns is important. Please make sure that parent partition column order and child partition column order is same. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178223172 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) + } else {Seq()} + val partitionerFields = fieldRelationMap.collect { + case (field, dataMapField) if childPartitionColumns + .exists(childCol => + childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && + dataMapField.aggregateFunction.isEmpty) => + PartitionerField(field.name.get, field.dataType, field.columnComment) + }.toSeq --- End diff -- PLease add some tests to make sure the order of partition columns are same as parent partition columns --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178237773 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala --- @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command.partition import java.util +import java.util.UUID --- End diff -- Not allowed --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178237981 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -149,7 +143,16 @@ case class CarbonLoadDataCommand( val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") - + currPartitions = if (table.isHivePartitionTable) { --- End diff -- Actually i had to move getPartitions to processData because when this method is called for child tables then we are expecting validSegments to be set. In processMeta they are not set. I have passed carbonTable to avoid metastore call. Should not cause problems.c --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178238607 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala --- @@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, override def executeCompaction(): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val loadMetaDataDetails = identifySegmentsToBeMerged() - val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + // If segmentFile name is specified in load details then segment is for partition table + // therefore the segment file name should be loadName#segmentFileName.segment + val segments = loadMetaDataDetails.asScala.map { + loadDetail => if (loadDetail.getSegmentFile != null) { + loadDetail.getLoadName + "#" + loadDetail.getSegmentFile + } else { + loadDetail.getLoadName + } --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178239696 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) --- End diff -- removed this method call --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178239706 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) + } else {Seq()} --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178264718 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -59,8 +60,22 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val childPartitionColumns = if (partitionInfo != null && + partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + PreAggregateUtil + .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns) + } else {Seq()} + val partitionerFields = fieldRelationMap.collect { + case (field, dataMapField) if childPartitionColumns + .exists(childCol => + childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && + dataMapField.aggregateFunction.isEmpty) => + PartitionerField(field.name.get, field.dataType, field.columnComment) + }.toSeq --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178264753 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala --- @@ -0,0 +1,440 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain id copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.carbondata.spark.testsuite.standardpartition + +import java.io.{File, FileWriter} + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll { --- End diff -- added insert overwrite test case --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4685/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3458/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4193/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4686/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3459/ --- |
Free forum by Nabble | Edit this page |