jackylk commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table
URL: https://github.com/apache/carbondata/pull/3431#discussion_r345160280 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala ########## @@ -89,27 +92,118 @@ case class CarbonAddLoadCommand( if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment") } - val segmentPath = options.getOrElse( - "path", throw new UnsupportedOperationException("PATH is manadatory")) + var inputPath = options.getOrElse( + "path", throw new UnsupportedOperationException("PATH is mandatory")) - val segSchema = MixedFormatHandler.getSchema(sparkSession, options, segmentPath) - - val segCarbonSchema = new Schema(segSchema.fields.map { field => + // infer schema and collect FileStatus for all partitions + val (inputPathSchema, lastLevelDirFileMap) = + MixedFormatHandler.collectInfo(sparkSession, options, inputPath) + var inputPathCarbonFields = inputPathSchema.fields.map { field => val dataType = convertSparkToCarbonDataType(field.dataType) new Field(field.name, dataType) - }) - - val tableCarbonSchema = new Schema(tableSchema.fields.map { field => + } + val carbonTableSchema = new Schema(tableSchema.fields.map { field => val dataType = convertSparkToCarbonDataType(field.dataType) new Field(field.name, dataType) }) + // update schema if has partition + val inputPathTableFields = if (carbonTable.isHivePartitionTable) { + val partitions = options.getOrElse("partition", + throw new AnalysisException( + "partition option is required when adding segment to partition table") + ) + // extract partition given by user, partition option should be form of "a:int, b:string" + val partitionFields = partitions.split(",") + .map(_.trim) + .filter(_.nonEmpty) + .map(_.toLowerCase) + .map { input => + val nameAndDataType = input.split(":") + if (nameAndDataType.size == 2) { + new Field(nameAndDataType(0), nameAndDataType(1)) + } else { + throw new AnalysisException(s"invalid partition option: ${options.toString()}") + } + } + // validate against the partition in carbon table + val carbonTablePartition = getCarbonTablePartition(sparkSession) + if (!partitionFields.sameElements(carbonTablePartition)) { + throw new AnalysisException( + s""" + |Partition is not same. Carbon table partition is : + |${carbonTablePartition.mkString(",")} and input segment partition is : + |${partitionFields.mkString(",")} + |""".stripMargin) + } + inputPathCarbonFields ++ partitionFields + } else { + if (options.contains("partition")) { + throw new AnalysisException(s"partition option is not required for non-partition table") Review comment: fixed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |