Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2415#discussion_r208224446
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---
@@ -92,11 +89,38 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
).filter(_._2.isDefined)
.map(property => s"'${property._1}' = '${property._2.get}'").mkString(",")
+ val partition: Seq[String] = if (options.partitionColumns.isDefined) {
+ if (options.partitionColumns.get.toSet.size != options.partitionColumns.get.length) {
+ throw new MalformedCarbonCommandException(s"repeated partition column")
+ }
+ options.partitionColumns.get.map { column =>
+ val field = schema.fields.find(_.name.equalsIgnoreCase(column))
+ if (field.isEmpty) {
+ throw new MalformedCarbonCommandException(s"invalid partition column: $column")
+ }
+ s"$column ${field.get.dataType.typeName}"
+ }
+ } else {
+ Seq()
+ }
+
+ val schemaWithoutPartition = if (options.partitionColumns.isDefined) {
+ val fields = schema.filterNot(field => options.partitionColumns.get.contains(field.name))
--- End diff --
fixed
---