ShreelekhyaG opened a new pull request #4151: URL: https://github.com/apache/carbondata/pull/4151 ### Why is this PR needed? When multiple inserts with single query is used, it fails from SparkPlan with: `java.lang.ClassCastException: GenericInternalRow cannot be cast to UnsafeRow`. For every successful insert/load we return Segment ID as a row. For multiple inserts also, we are returning a row containing Segment ID but while processing in spark `ClassCastException` is thrown. ### What changes were proposed in this PR? When multiple insert query is given, it uses `Union` node. Based on its presence, made changes to use flag `isMultipleInserts` to identify and return empty row. ### Does this PR introduce any user interface change? - No ### Is any new testcase added? - Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA2 commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-861257190 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5546/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-861268787 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3803/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-861580533 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-861696472 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#discussion_r652433296 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -294,6 +294,13 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String], } throw ex } + if (internalOptions.contains("isMultipleInserts")) { + if (internalOptions.get("isMultipleInserts").get.toBoolean) { + // when multiple insert statements are given with single plan (Union), + // no need to return segment id as row. + return Seq.empty Review comment: here we are returning the segment ID from load command so that the application code integrated with carbondata can take decisions based on that. So if we don't return in this case, we will not give valid info to underlying application and may lead to wrong decision by applications if they were taking any. So we cant fix it like this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-862135644 Since our command classes extends spark command classes, `ExecutedCommandExec` will be the physical operator that executes the run method of a `RunnableCommand`. So we in this case we return row object, and this will be converted to `GenericInternalRow` in `sideEffectResult` of `ExecutedCommandExec`, but during Union operatio execution it expects to convert to Unsafe row and it fails with cast exception. So as suggested by spark, we can override `sideEffectResult` and handle this case with the output of our command classes, like insert into command in this case, so that we can send proper results from physical node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#discussion_r654251072 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`"))) } + test("test load with multiple inserts") { + sql("drop table if exists catalog_returns_5") + sql("drop table if exists catalog_returns_6") + sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," + + "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'") + sql("insert into catalog_returns_5 values(1,2,3)") + sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" + + " (cr_returned_date_sk int) stored as carbondata") + sql( + "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " + + "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " + + "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " + + "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " + + "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk") Review comment: Done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`"))) } + test("test load with multiple inserts") { + sql("drop table if exists catalog_returns_5") + sql("drop table if exists catalog_returns_6") + sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," + + "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'") + sql("insert into catalog_returns_5 values(1,2,3)") + sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" + + " (cr_returned_date_sk int) stored as carbondata") + sql( + "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " + + "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " + + "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " + + "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " + + "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk") + checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1))) Review comment: Done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy { } } +case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode { + + override lazy val metrics: Map[String, SQLMetric] = cmd.metrics + + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val internalRow = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) + val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) + // To make GenericInternalRow to UnsafeRow + val row = unsafeProjection(internalRow.head) + Seq(row) + } + + override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil + + override def output: Seq[Attribute] = cmd.output + + override def nodeName: String = "Execute " + cmd.nodeName + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult, 1) + } Review comment: ok, removed ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -57,7 +59,14 @@ object DMLStrategy extends SparkStrategy { case loadData: LoadDataCommand if isCarbonTable(loadData.table) => ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil case insert: InsertIntoCarbonTable => - ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil + if (insert.isMultipleInserts) { + // when multiple insert statements are given with single plan (Union), + // Spark expects row to be of UnsafeRow. Here use UnionCommandExec to + // implement custom sideEffectResult and return row as UnsafeRow + UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil Review comment: Done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy { } } +case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode { Review comment: Done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy { case loadData: LoadDataCommand if isCarbonTable(loadData.table) => ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil case insert: InsertIntoCarbonTable => - ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil + if (insert.isMultipleInserts) { + // Successful insert in carbon will return segment ID as row. Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ########## @@ -250,13 +251,19 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _) if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - castChildOutput(p, relation, child) + var isMultipleInserts = false Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-863947493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-864034391 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#discussion_r654207938 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`"))) } + test("test load with multiple inserts") { + sql("drop table if exists catalog_returns_5") + sql("drop table if exists catalog_returns_6") + sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," + + "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'") + sql("insert into catalog_returns_5 values(1,2,3)") + sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" + + " (cr_returned_date_sk int) stored as carbondata") + sql( + "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " + + "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " + + "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " + + "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " + + "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk") Review comment: here please collect the segments IDs returned from the insert command and validate them also. ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`"))) } + test("test load with multiple inserts") { + sql("drop table if exists catalog_returns_5") + sql("drop table if exists catalog_returns_6") + sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," + + "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'") + sql("insert into catalog_returns_5 values(1,2,3)") + sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" + + " (cr_returned_date_sk int) stored as carbondata") + sql( + "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " + + "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " + + "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " + + "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " + + "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk") + checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1))) Review comment: ```suggestion checkAnswer(sql("select *cfrom catalog_returns_6"), Seq(Row(2, 3, 1))) ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy { } } +case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode { + + override lazy val metrics: Map[String, SQLMetric] = cmd.metrics + + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val internalRow = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) + val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) + // To make GenericInternalRow to UnsafeRow + val row = unsafeProjection(internalRow.head) + Seq(row) + } + + override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil + + override def output: Seq[Attribute] = cmd.output + + override def nodeName: String = "Execute " + cmd.nodeName + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult, 1) + } Review comment: here no need to override all, just can override abstract one like` doExecute()` and `output`, others we are not using so no need to override since no specific logic ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -57,7 +59,14 @@ object DMLStrategy extends SparkStrategy { case loadData: LoadDataCommand if isCarbonTable(loadData.table) => ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil case insert: InsertIntoCarbonTable => - ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil + if (insert.isMultipleInserts) { + // when multiple insert statements are given with single plan (Union), + // Spark expects row to be of UnsafeRow. Here use UnionCommandExec to + // implement custom sideEffectResult and return row as UnsafeRow + UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil Review comment: here the comment gives little different meaning, here you can say about how insert command in carbon returns the rows containing corresponding segmentIDs and incase of this specific multiple scenario the Union node executes in the physical plan phase of the command, so the rows should be of unsafe row object. So we should override the `sideEffectResult` to prepare the content of command's corresponding rdd from physical plan of insert into command. You can clean it and update the command something like this ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy { } } +case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode { Review comment: give a class level comment ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`"))) } + test("test load with multiple inserts") { + sql("drop table if exists catalog_returns_5") + sql("drop table if exists catalog_returns_6") + sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," + + "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'") + sql("insert into catalog_returns_5 values(1,2,3)") + sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" + + " (cr_returned_date_sk int) stored as carbondata") + sql( + "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " + + "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " + + "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " + + "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " + + "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk") + checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1))) Review comment: ```suggestion checkAnswer(sql("select * from catalog_returns_6"), Seq(Row(2, 3, 1))) ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ########## @@ -250,13 +251,19 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _) if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - castChildOutput(p, relation, child) + var isMultipleInserts = false Review comment: ```suggestion var containsMultipleInserts = false ``` Please in all places ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy { case loadData: LoadDataCommand if isCarbonTable(loadData.table) => ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil case insert: InsertIntoCarbonTable => - ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil + if (insert.isMultipleInserts) { + // Successful insert in carbon will return segment ID as row. Review comment: ```suggestion // Successful insert in carbon will return segment ID in a row. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on pull request #4151: URL: https://github.com/apache/carbondata/pull/4151#issuecomment-864380627 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
asfgit closed pull request #4151: URL: https://github.com/apache/carbondata/pull/4151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |