[GitHub] [carbondata] kunal642 commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table

GitBox
kunal642 commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table
URL: https://github.com/apache/carbondata/pull/3431#discussion_r345022382
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 ##########
 @@ -89,27 +92,118 @@ case class CarbonAddLoadCommand(
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
     }
-    val segmentPath = options.getOrElse(
-      "path", throw new UnsupportedOperationException("PATH is manadatory"))
+    var inputPath = options.getOrElse(
+      "path", throw new UnsupportedOperationException("PATH is mandatory"))
 
-    val segSchema = MixedFormatHandler.getSchema(sparkSession, options, segmentPath)
-
-    val segCarbonSchema = new Schema(segSchema.fields.map { field =>
+    // infer schema and collect FileStatus for all partitions
+    val (inputPathSchema, lastLevelDirFileMap) =
+      MixedFormatHandler.collectInfo(sparkSession, options, inputPath)
+    var inputPathCarbonFields = inputPathSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
       new Field(field.name, dataType)
-    })
-
-    val tableCarbonSchema = new Schema(tableSchema.fields.map { field =>
+    }
+    val carbonTableSchema = new Schema(tableSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
       new Field(field.name, dataType)
     })
 
+    // update schema if has partition
+    val inputPathTableFields = if (carbonTable.isHivePartitionTable) {
+      val partitions = options.getOrElse("partition",
+        throw new AnalysisException(
+          "partition option is required when adding segment to partition table")
+      )
+      // extract partition given by user, partition option should be form of "a:int, b:string"
+      val partitionFields = partitions.split(",")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(_.toLowerCase)
+        .map { input =>
 
 Review comment:
   combine all the map and filters to avoid unncessary looping

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services