[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

classic Classic list List threaded Threaded
48 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
GitHub user ravipesala opened a pull request:

    https://github.com/apache/carbondata/pull/1755

    [CARBONDATA-1976][PARTITION] Support combination of dynamic and static partitions. And fix concurrent partition load issue.

    Support combination of dynamic and static partitions.
    Like user can give as follows
    ```
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiondynamic
     PARTITION(empno='1', empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
    ```
    And fix the concurrent partition load issue as sometimes it gives file not found exception while deleting temporary folders.
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [X] Any interfaces changed? NO
     
     - [X] Any backward compatibility impacted? NO
     
     - [X] Document update required? NO
   
     - [X] Testing done
           Tests added
           
     - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata static-dynamic-partition-mix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1755
   
----
commit b387bbe37e577822a6985e46e8de37cf87ebec59
Author: ravipesala <ravi.pesala@...>
Date:   2018-01-03T05:18:09Z

    Fix dynamic and static partition data loading combination

commit 205f61c18281053b043d3e9d46f0f32f7b203df5
Author: ravipesala <ravi.pesala@...>
Date:   2018-01-03T12:55:09Z

    Fixed concurrent partition load issue.

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1293/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2523/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2683/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159591267
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---
    @@ -28,7 +31,7 @@ import org.apache.carbondata.core.util.CarbonProperties
     import org.apache.carbondata.core.util.path.CarbonTablePath
     
     class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
    -
    +  var executorService: ExecutorService = _
    --- End diff --
   
    move to within the testcase as only that testcase uses it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159591690
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -611,7 +611,7 @@ case class CarbonLoadDataCommand(
           val output = if (partition.nonEmpty) {
             catalogTable.schema.map{ attr =>
               attributes.find(_.name.equalsIgnoreCase(attr.name)).get
    -        }.filter(attr => partition.get(attr.name).isEmpty)
    +        }.filter(attr => partition.getOrElse(attr.name, None).isEmpty)
    --- End diff --
   
    It seems the same as before?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159591969
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    --- End diff --
   
    provide variable name to improve readability


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159591980
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    --- End diff --
   
    provide variable name to improve readability


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159591992
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
    +      // Commit the removed partitions in carbon store.
    +      new CarbonDropPartitionCommitRDD(
    +        sparkSession.sparkContext,
    +        table.getTablePath,
    +        segments.asScala,
    +        true,
    --- End diff --
   
    provide variable name to improve readability


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159592167
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
    +      // Commit the removed partitions in carbon store.
    +      new CarbonDropPartitionCommitRDD(
    +        sparkSession.sparkContext,
    +        table.getTablePath,
    +        segments.asScala,
    +        true,
    +        uniqueId).collect()
    +      // Update the loadstatus with update time to clear cache from driver.
    +      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
    +        .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
    --- End diff --
   
    make these two lines more readable


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2535/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159614552
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---
    @@ -28,7 +31,7 @@ import org.apache.carbondata.core.util.CarbonProperties
     import org.apache.carbondata.core.util.path.CarbonTablePath
     
     class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
    -
    +  var executorService: ExecutorService = _
    --- End diff --
   
    yes, but in case that testcase fails also i want to shutdown in afterall thats why added in class level


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159614744
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159614773
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159614791
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
    +      // Commit the removed partitions in carbon store.
    +      new CarbonDropPartitionCommitRDD(
    +        sparkSession.sparkContext,
    +        table.getTablePath,
    +        segments.asScala,
    +        true,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159615305
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
    +      // Commit the removed partitions in carbon store.
    +      new CarbonDropPartitionCommitRDD(
    +        sparkSession.sparkContext,
    +        table.getTablePath,
    +        segments.asScala,
    +        true,
    +        uniqueId).collect()
    +      // Update the loadstatus with update time to clear cache from driver.
    +      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
    +        .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2699/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1313/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2547/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1755: [CARBONDATA-1976][PARTITION] Support combination of ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1755
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1320/



---
123