http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/jira-Closed-CARBONDATA-3851-Merge-Update-and-Insert-with-Partition-Table-is-giving-different-resultss-tp103003.html
Sachin Ramachandra Setty closed CARBONDATA-3851.
This PR resolved this issue.
> Merge Update and Insert with Partition Table is giving different results in different spark deploy modes
> --------------------------------------------------------------------------------------------------------
>
> Key: CARBONDATA-3851
> URL:
https://issues.apache.org/jira/browse/CARBONDATA-3851> Project: CarbonData
> Issue Type: Bug
> Components: spark-integration
> Affects Versions: 2.0.0
> Reporter: Sachin Ramachandra Setty
> Priority: Major
> Fix For: 2.1.0
>
>
> The result sets are different when run the queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> Steps to Reproduce Issue :
> {code}
> import scala.collection.JavaConverters._
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import org.apache.spark.sql.execution.command.mutation.merge.
> {CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.
> {BooleanType, DateType, IntegerType, StringType, StructField, StructType}
> import spark.implicits._
> sql("drop table if exists order").show()
> sql("drop table if exists order_hist").show()
> sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
> val initframe = sc.parallelize(1 to 10, 4).map
> { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
> .toDF("id", "name", "c_name", "quantity", "price", "state")
> initframe.write
> .format("carbondata")
> .option("tableName", "order")
> .option("partitionColumns", "c_name")
> .mode(SaveMode.Overwrite)
> .save()
> val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
> val dwSelframe = dwframe.as("A")
> val ds1 = sc.parallelize(3 to 10, 4)
> .map {x =>
> if (x <= 4)
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
> else
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> }.toDF("id", "name", "c_name", "quantity", "price", "state")
> ds1.show()
> val ds2 = sc.parallelize(1 to 2, 4)
> .map
> {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> .toDS().toDF()
> ds2.show()
> val ds3 = ds1.union(ds2)
> ds3.show()
> val odsframe = ds3.as("B")
> var matches = Seq.empty[MergeMatch]
> val updateMap = Map(col("id") -> col("A.id"),
> col("price") -> expr("B.price + 1"),
> col("state") -> col("B.state"))
> val insertMap = Map(col("id") -> col("B.id"),
> col("name") -> col("B.name"),
> col("c_name") -> col("B.c_name"),
> col("quantity") -> col("B.quantity"),
> col("price") -> expr("B.price * 100"),
> col("state") -> col("B.state"))
> val insertMap_u = Map(col("id") -> col("id"),
> col("name") -> col("name"),
> col("c_name") -> lit("insert"),
> col("quantity") -> col("quantity"),
> col("price") -> expr("price"),
> col("state") -> col("state"))
> val insertMap_d = Map(col("id") -> col("id"),
> col("name") -> col("name"),
> col("c_name") -> lit("delete"),
> col("quantity") -> col("quantity"),
> col("price") -> expr("price"),
> col("state") -> col("state"))
> matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
> matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
> matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))
> {code}
>
> SQL Queries :
> {code}
> sql("select count(*) from order").show()
> sql("select count(*) from order where state = 2").show()
> sql("select price from order where id = 'newid1'").show()
> sql("select count(*) from order_hist where c_name = 'delete'").show()
> sql("select count(*) from order_hist where c_name = 'insert'").show()
> {code}
> Results in spark-shell --master yarn
> {code}
> scala> sql("select count(*) from order").show()
> +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
> +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
> +-----+
> |price|
> +-----+
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
> +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
> +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> {code}
> Results in spark-shell --master local
> {code}
> scala> sql("select count(*) from order").show()
> +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
> +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
> +-----+
> |price|
> +-----+
> |7500|
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
> +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
> +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> {code}