Login  Register

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4151: [CARBONDATA-4211] Fix - from xx Insert into select fails if an SQL statement contains multiple inserts

Posted by GitBox on Jun 18, 2021; 9:19pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-ShreelekhyaG-opened-a-new-pull-request-4151-WIP-Fix-from-xx-Insert-into-select-fais-tp108907p108978.html


akashrn5 commented on a change in pull request #4151:
URL: https://github.com/apache/carbondata/pull/4151#discussion_r654207938



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
   }
 
+  test("test load with multiple inserts") {
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+    sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," +
+        "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'")
+    sql("insert into catalog_returns_5 values(1,2,3)")
+    sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" +
+        " (cr_returned_date_sk int) stored as carbondata")
+    sql(
+      "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " +
+      "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " +
+      "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " +
+      "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " +
+      "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk")

Review comment:
       here please collect the segments IDs returned from the insert command and validate them also.

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
   }
 
+  test("test load with multiple inserts") {
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+    sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," +
+        "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'")
+    sql("insert into catalog_returns_5 values(1,2,3)")
+    sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" +
+        " (cr_returned_date_sk int) stored as carbondata")
+    sql(
+      "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " +
+      "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " +
+      "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " +
+      "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " +
+      "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk")
+    checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1)))

Review comment:
       ```suggestion
       checkAnswer(sql("select *cfrom catalog_returns_6"), Seq(Row(2, 3, 1)))
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy {
   }
 }
 
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {
+
+  override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
+
+  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+    val internalRow = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
+    val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+    // To make GenericInternalRow to UnsafeRow
+    val row = unsafeProjection(internalRow.head)
+    Seq(row)
+  }
+
+  override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
+
+  override def output: Seq[Attribute] = cmd.output
+
+  override def nodeName: String = "Execute " + cmd.nodeName
+
+  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
+
+  override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
+
+  override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+  }

Review comment:
       here no need to override all, just can override abstract one like` doExecute()` and `output`, others we are not using so no need to override since no specific logic

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -57,7 +59,14 @@ object DMLStrategy extends SparkStrategy {
       case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
         ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
       case insert: InsertIntoCarbonTable =>
-        ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        if (insert.isMultipleInserts) {
+          // when multiple insert statements are given with single plan (Union),
+          // Spark expects row to be of UnsafeRow. Here use UnionCommandExec to
+          // implement custom sideEffectResult and return row as UnsafeRow
+          UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil

Review comment:
       here the comment gives little different meaning, here you can say about how insert command in carbon returns the rows containing corresponding segmentIDs and incase of this specific multiple scenario the Union node executes in the physical plan phase of the command, so the rows should be of unsafe row object. So we should override the `sideEffectResult` to prepare the content of command's corresponding rdd from physical plan of insert into command. You can clean it and update the command something like this

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy {
   }
 }
 
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {

Review comment:
       give a class level comment

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
   }
 
+  test("test load with multiple inserts") {
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+    sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," +
+        "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'")
+    sql("insert into catalog_returns_5 values(1,2,3)")
+    sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" +
+        " (cr_returned_date_sk int) stored as carbondata")
+    sql(
+      "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " +
+      "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " +
+      "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " +
+      "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " +
+      "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk")
+    checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1)))

Review comment:
       ```suggestion
       checkAnswer(sql("select * from catalog_returns_6"), Seq(Row(2, 3, 1)))
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
##########
@@ -250,13 +251,19 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
 
       case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
         if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        castChildOutput(p, relation, child)
+        var isMultipleInserts = false

Review comment:
       ```suggestion
           var containsMultipleInserts = false
   ```
   Please in all places

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy {
       case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
         ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
       case insert: InsertIntoCarbonTable =>
-        ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        if (insert.isMultipleInserts) {
+          // Successful insert in carbon will return segment ID as row.

Review comment:
       ```suggestion
             // Successful insert in carbon will return segment ID in a row.
   ```




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]