Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r130503124 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java --- @@ -440,9 +510,17 @@ protected Expression getFilterPredicates(Configuration configuration) { for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> entry : segmentIndexMap.entrySet()) { SegmentTaskIndexStore.TaskBucketHolder taskHolder = entry.getKey(); - int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo); + int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo); --- End diff -- **partitionId**...get**TaskId**FromTaskNo? Is this OKï¼ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user lionelcao commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r130504413 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java --- @@ -440,9 +510,17 @@ protected Expression getFilterPredicates(Configuration configuration) { for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> entry : segmentIndexMap.entrySet()) { SegmentTaskIndexStore.TaskBucketHolder taskHolder = entry.getKey(); - int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo); + int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo); --- End diff -- Yes, for partition table, we use partitionId as taskId --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user lionelcao commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r130505076 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java --- @@ -65,6 +65,31 @@ public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType partitio this.partitionIds = new ArrayList<>(); } + /** + * add partition means split default partition, add in last directly + */ + public void addPartition(int addPartitionCount) { + for (int i = 0; i < addPartitionCount; i++) { + partitionIds.add(++MAX_PARTITION); + numPartitions++; + } + } + + /** + * e.g. original partition[0,1,2,3,4,5] + * split partition 2 to partition 6,7,8 (will not reuse 2) + * then sourcePartitionId is 2, newPartitionNumbers is 3 + * @param sourcePartitionIndex + * @param newPartitionNumbers + */ + public void splitPartition(int sourcePartitionIndex, int newPartitionNumbers) { --- End diff -- Carbon partition design is different from Hive partition, it's not based on path but blocks. Split means add new partition between existed partitions for range partition table. For list partition table, split allow user to reduce the partition data volume. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user lionelcao commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r130506012 --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java --- @@ -303,6 +303,20 @@ public String getCarbonIndexFilePath(String taskId, String partitionId, String s } } + public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId, + int batchNo, String bucketNumber, String timeStamp, + ColumnarFormatVersion columnarFormatVersion) { + switch (columnarFormatVersion) { + case V1: + case V2: + return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber); + default: --- End diff -- V3 will go as default, this function is not created by myself, just added batchNo based on original one to get correct index path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user lionelcao commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r130506636 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java --- @@ -65,6 +65,31 @@ public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType partitio this.partitionIds = new ArrayList<>(); } + /** + * add partition means split default partition, add in last directly + */ + public void addPartition(int addPartitionCount) { + for (int i = 0; i < addPartitionCount; i++) { + partitionIds.add(++MAX_PARTITION); + numPartitions++; + } + } + + /** + * e.g. original partition[0,1,2,3,4,5] + * split partition 2 to partition 6,7,8 (will not reuse 2) + * then sourcePartitionId is 2, newPartitionNumbers is 3 + * @param sourcePartitionIndex + * @param newPartitionNumbers + */ + public void splitPartition(int sourcePartitionIndex, int newPartitionNumbers) { --- End diff -- "When will partition-split action be triggeredï¼" Please refer to my description in 'Conversation' Tab, user can execute alter table add or split statement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 SDV Build Failed with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/36/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on the issue:
https://github.com/apache/carbondata/pull/1192 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 Build Success with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/736/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3333/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3334/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131056387 --- Diff: conf/carbon.properties.template --- @@ -42,6 +42,9 @@ carbon.enableXXHash=true #carbon.max.level.cache.size=-1 #enable prefetch of data during merge sort while reading data from sort temp files in data loading #carbon.merge.sort.prefetch=true +######## Alter Partition Configuration ######## +#Number of cores to be used while alter partition --- End diff -- Can you explain in document for what purpose these cores are required? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131057020 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala --- @@ -101,17 +126,40 @@ object CarbonPartitionExample { spark.sql(""" | CREATE TABLE IF NOT EXISTS t5 | ( + | id Int, | vin String, | logdate Timestamp, | phonenumber Long, - | area String + | area String, + | salary Int |) | PARTITIONED BY (country String) | STORED BY 'carbondata' | TBLPROPERTIES('PARTITION_TYPE'='LIST', - | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ') + | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ') """.stripMargin) + // load data into partition table + spark.sql(s""" + LOAD DATA LOCAL INPATH '$testData' into table t0 options('BAD_RECORDS_ACTION'='FORCE') + """) + spark.sql(s""" + LOAD DATA LOCAL INPATH '$testData' into table t5 options('BAD_RECORDS_ACTION'='FORCE') + """) + + // alter list partition table t5 to add a partition + spark.sql(s"""Alter table t5 add partition ('OutSpace')""".stripMargin) + // alter list partition table t5 to split partition 4 into 3 independent partition + spark.sql( + s""" + Alter table t5 split partition(4) into ('Canada', 'Russia', '(Good, NotGood)') + """.stripMargin) --- End diff -- after doing this, can you verify the partition is correct by using desc formatted? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131057207 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java --- @@ -107,6 +107,7 @@ // comma separated list of input files public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; + public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; --- End diff -- modify in `CarbonTableInputFormat`, we will use it in the future --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 Build Failed with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/737/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131057576 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -321,6 +321,84 @@ private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configu } /** + * Read data in one segment. For alter table partition statement + * @param job + * @param targetSegment + * @param oldPartitionIdList get old partitionId before partitionInfo was changed + * @return + * @throws IOException + */ + public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment, + List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) + throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + List<String> invalidSegments = new ArrayList<>(); + List<UpdateVO> invalidTimestampsList = new ArrayList<>(); + + List<String> segmentList = new ArrayList<>(); + segmentList.add(targetSegment); + setSegmentsToAccess(job.getConfiguration(), segmentList); + try { + + // process and resolve the expression + Expression filter = getFilterPredicates(job.getConfiguration()); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + // this will be null in case of corrupt schema file. + if (null == carbonTable) { + throw new IOException("Missing/Corrupt schema file for table."); + } + + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + + // prune partitions for filter query on partition table + String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID); + BitSet matchedPartitions = null; + if (partitionInfo != null) { + matchedPartitions = setMatchedPartitions(partitionIds, filter, partitionInfo); + if (matchedPartitions != null) { + if (matchedPartitions.cardinality() == 0) { + return new ArrayList<InputSplit>(); + } else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) { + matchedPartitions = null; + } + } + } + + FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + // do block filtering and get split + List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions, + partitionInfo, oldPartitionIdList); + // pass the invalid segment to task side in order to remove index entry in task side + if (invalidSegments.size() > 0) { + for (InputSplit split : splits) { + ((CarbonInputSplit) split).setInvalidSegments(invalidSegments); + ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList); + } + } + return splits; + } catch (IOException e) { + throw new RuntimeException("Can't get splits of the target segment ", e); + } + } + + private BitSet setMatchedPartitions(String partitionIds, Expression filter, + PartitionInfo partitionInfo) { + BitSet matchedPartitions = null; + if (null != partitionIds) { + String[] partList = partitionIds.replace("[", "").replace("]", "").split(","); + matchedPartitions = new BitSet(Integer.parseInt(partList[0])); --- End diff -- Can you add some comment here, why it is using `partList[0]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1192 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3336/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131058061 --- Diff: processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonDataSpliterUtil.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.spliter; + +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.scan.result.BatchResult; + +public final class CarbonDataSpliterUtil { --- End diff -- Is this needed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131058295 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala --- @@ -184,6 +189,161 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab } } +/** + * Command for Alter Table Add & Split partition + * Add is a special case of Splitting the default partition (part0) + * @param alterTableSplitPartitionModel + */ +case class AlterTableSplitPartition(alterTableSplitPartitionModel: AlterTableSplitPartitionModel) + extends RunnableCommand { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def run(sparkSession: SparkSession): Seq[Row] = { + + val tableName = alterTableSplitPartitionModel.tableName + val dbName = alterTableSplitPartitionModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + val splitInfo = alterTableSplitPartitionModel.splitInfo + val partitionId = Integer.parseInt(alterTableSplitPartitionModel.partitionId) + val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) + val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) + + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + var locks = List.empty[ICarbonLock] + try { + locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, + locksToBeAcquired)(sparkSession) + val carbonMetastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val relation = carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier + val storePath = relation.tableMeta.storePath + if (relation == null) { + sys.error(s"Table $dbName.$tableName does not exist") + } + carbonMetastore.checkSchemasModifiedTimeAndReloadTables(storePath) + if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { + LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") + sys.error(s"Alter table failed. table not found: $dbName.$tableName") + } + val carbonLoadModel = new CarbonLoadModel() + + val table = relation.tableMeta.carbonTable + val partitionInfo = table.getPartitionInfo(tableName) + val partitionIdList = partitionInfo.getPartitionIds.asScala + // keep a copy of partitionIdList before update partitionInfo. + // will be used in partition data scan + val oldPartitionIdList: ArrayBuffer[Int] = new ArrayBuffer[Int]() + for (i: Integer <- partitionIdList) { --- End diff -- do not use for loop, use `map` or `addAll` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1192#discussion_r131058343 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala --- @@ -184,6 +189,161 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab } } +/** + * Command for Alter Table Add & Split partition + * Add is a special case of Splitting the default partition (part0) + * @param alterTableSplitPartitionModel + */ +case class AlterTableSplitPartition(alterTableSplitPartitionModel: AlterTableSplitPartitionModel) + extends RunnableCommand { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def run(sparkSession: SparkSession): Seq[Row] = { + + val tableName = alterTableSplitPartitionModel.tableName + val dbName = alterTableSplitPartitionModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + val splitInfo = alterTableSplitPartitionModel.splitInfo + val partitionId = Integer.parseInt(alterTableSplitPartitionModel.partitionId) + val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) + val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) + + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + var locks = List.empty[ICarbonLock] + try { + locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, + locksToBeAcquired)(sparkSession) + val carbonMetastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val relation = carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier + val storePath = relation.tableMeta.storePath + if (relation == null) { + sys.error(s"Table $dbName.$tableName does not exist") + } + carbonMetastore.checkSchemasModifiedTimeAndReloadTables(storePath) + if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { + LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") + sys.error(s"Alter table failed. table not found: $dbName.$tableName") + } + val carbonLoadModel = new CarbonLoadModel() + + val table = relation.tableMeta.carbonTable + val partitionInfo = table.getPartitionInfo(tableName) + val partitionIdList = partitionInfo.getPartitionIds.asScala + // keep a copy of partitionIdList before update partitionInfo. + // will be used in partition data scan + val oldPartitionIdList: ArrayBuffer[Int] = new ArrayBuffer[Int]() + for (i: Integer <- partitionIdList) { + oldPartitionIdList.append(i) + } + + if (partitionInfo == null) { + sys.error(s"Table $tableName is not a partition table.") + } + if (partitionInfo.getPartitionType == PartitionType.HASH) { + sys.error(s"Hash partition table cannot be added or split!") + } + /** + * verify the add/split information and update the partitionInfo: + * 1. update rangeInfo/listInfo + * 2. update partitionIds + */ + val columnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType + val index = partitionIdList.indexOf(partitionId) + if (partitionInfo.getPartitionType == PartitionType.RANGE) { + val rangeInfo = partitionInfo.getRangeInfo.asScala.toList + val newRangeInfo = partitionId match { + case 0 => rangeInfo ++ splitInfo + case _ => rangeInfo.take(index - 1) ++ splitInfo ++ + rangeInfo.takeRight(rangeInfo.size - index) + } + CommonUtil.validateRangeInfo(newRangeInfo, columnDataType, + timestampFormatter, dateFormatter) + partitionInfo.setRangeInfo(newRangeInfo.asJava) + } else if (partitionInfo.getPartitionType == PartitionType.LIST) { + val originList = partitionInfo.getListInfo.asScala.map(_.asScala.toList).toList + if (partitionId != 0) { + val targetListInfo = partitionInfo.getListInfo.get(index - 1) + CommonUtil.validateSplitListInfo(targetListInfo.asScala.toList, splitInfo, originList) + } else { + CommonUtil.validateAddListInfo(splitInfo, originList) + } + val addListInfo = PartitionUtils.getListInfo(splitInfo.mkString(",")) + val newListInfo = partitionId match { + case 0 => originList ++ addListInfo + case _ => originList.take(index - 1) ++ addListInfo ++ + originList.takeRight(originList.size - index) + } + partitionInfo.setListInfo(newListInfo.map(_.asJava).asJava) + } + + if (partitionId == 0) { + partitionInfo.addPartition(splitInfo.size) + } else { + partitionInfo.splitPartition(index, splitInfo.size) + } + + val dataLoadSchema = new CarbonDataLoadSchema(table) + + carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) + carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) + carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) + carbonLoadModel.setStorePath(storePath) + val loadStartTime = CarbonUpdateUtil.readCurrentTime + carbonLoadModel.setFactTimeStamp(loadStartTime) + + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val schemaFilePath = carbonTablePath.getSchemaFilePath + // read TableInfo + val tableInfo = carbonMetastore.getThriftTableInfo(carbonTablePath)(sparkSession) + --- End diff -- remove empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |