[jira] [Closed] (CARBONDATA-3851) Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Closed] (CARBONDATA-3851) Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

Akash R Nilugal (Jira)

     [ https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sachin Ramachandra Setty closed CARBONDATA-3851.
------------------------------------------------

This PR  resolved this issue.

https://github.com/apache/carbondata/pull/3835

> Merge Update and Insert with Partition Table is giving different results in different spark deploy modes
> --------------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-3851
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3851
>             Project: CarbonData
>          Issue Type: Bug
>          Components: spark-integration
>    Affects Versions: 2.0.0
>            Reporter: Sachin Ramachandra Setty
>            Priority: Major
>             Fix For: 2.1.0
>
>
> The result sets are different when run the queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> Steps to Reproduce Issue :
> {code}
> import scala.collection.JavaConverters._
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import org.apache.spark.sql.execution.command.mutation.merge.
> {CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.
> {BooleanType, DateType, IntegerType, StringType, StructField, StructType}
> import spark.implicits._
> sql("drop table if exists order").show()
> sql("drop table if exists order_hist").show()
> sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
> val initframe = sc.parallelize(1 to 10, 4).map
> { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
> .toDF("id", "name", "c_name", "quantity", "price", "state")
> initframe.write
>  .format("carbondata")
>  .option("tableName", "order")
>  .option("partitionColumns", "c_name")
>  .mode(SaveMode.Overwrite)
>  .save()
> val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
> val dwSelframe = dwframe.as("A")
> val ds1 = sc.parallelize(3 to 10, 4)
>  .map {x =>
>  if (x <= 4)
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
> else
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> }.toDF("id", "name", "c_name", "quantity", "price", "state")
> ds1.show()
>  val ds2 = sc.parallelize(1 to 2, 4)
>  .map
> {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> .toDS().toDF()
>  ds2.show()
>  val ds3 = ds1.union(ds2)
>  ds3.show()
> val odsframe = ds3.as("B")
> var matches = Seq.empty[MergeMatch]
>  val updateMap = Map(col("id") -> col("A.id"),
>  col("price") -> expr("B.price + 1"),
>  col("state") -> col("B.state"))
> val insertMap = Map(col("id") -> col("B.id"),
>  col("name") -> col("B.name"),
>  col("c_name") -> col("B.c_name"),
>  col("quantity") -> col("B.quantity"),
>  col("price") -> expr("B.price * 100"),
>  col("state") -> col("B.state"))
> val insertMap_u = Map(col("id") -> col("id"),
>  col("name") -> col("name"),
>  col("c_name") -> lit("insert"),
>  col("quantity") -> col("quantity"),
>  col("price") -> expr("price"),
>  col("state") -> col("state"))
> val insertMap_d = Map(col("id") -> col("id"),
>  col("name") -> col("name"),
>  col("c_name") -> lit("delete"),
>  col("quantity") -> col("quantity"),
>  col("price") -> expr("price"),
>  col("state") -> col("state"))
> matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
>  matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
>  matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))
> {code}
>  
> SQL Queries :
> {code}
> sql("select count(*) from order").show()
>  sql("select count(*) from order where state = 2").show()
>  sql("select price from order where id = 'newid1'").show()
>  sql("select count(*) from order_hist where c_name = 'delete'").show()
>  sql("select count(*) from order_hist where c_name = 'insert'").show()
> {code}
> Results in spark-shell --master yarn
> {code}
>  scala> sql("select count(*) from order").show()
>  +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
>  +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
>  +-----+
> |price|
> +-----+
>  +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
>  +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
>  +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> {code}
> Results in spark-shell --master local
> {code}
> scala> sql("select count(*) from order").show()
>  +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
>  +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
>  +-----+
> |price|
> +-----+
> |7500|
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
>  +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
>  +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)