[jira] [Updated] (CARBONDATA-3851) Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Updated] (CARBONDATA-3851) Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

Akash R Nilugal (Jira)

     [ https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sachin Ramachandra Setty updated CARBONDATA-3851:
-------------------------------------------------
    Description:
The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

Steps to Reproduce Issue :

import scala.collection.JavaConverters._
 import java.sql.Date

import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.mutation.merge.

{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}

import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types.

{BooleanType, DateType, IntegerType, StringType, StructField, StructType}

import spark.implicits._

sql("drop table if exists order").show()
 sql("drop table if exists order_hist").show()
 sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()

val initframe = sc.parallelize(1 to 10, 4).map

{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}

.toDF("id", "name", "c_name", "quantity", "price", "state")

initframe.write
 .format("carbondata")
 .option("tableName", "order")
 .option("partitionColumns", "c_name")
 .mode(SaveMode.Overwrite)
 .save()

val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
 val dwSelframe = dwframe.as("A")

val ds1 = sc.parallelize(3 to 10, 4)
 .map {x =>
 if (x <= 4)

{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }

else

{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }

}.toDF("id", "name", "c_name", "quantity", "price", "state")

ds1.show()
 val ds2 = sc.parallelize(1 to 2, 4)
 .map

{x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }

.toDS().toDF()
 ds2.show()
 val ds3 = ds1.union(ds2)
 ds3.show()

val odsframe = ds3.as("B")

var matches = Seq.empty[MergeMatch]
 val updateMap = Map(col("id") -> col("A.id"),
 col("price") -> expr("B.price + 1"),
 col("state") -> col("B.state"))

val insertMap = Map(col("id") -> col("B.id"),
 col("name") -> col("B.name"),
 col("c_name") -> col("B.c_name"),
 col("quantity") -> col("B.quantity"),
 col("price") -> expr("B.price * 100"),
 col("state") -> col("B.state"))

val insertMap_u = Map(col("id") -> col("id"),
 col("name") -> col("name"),
 col("c_name") -> lit("insert"),
 col("quantity") -> col("quantity"),
 col("price") -> expr("price"),
 col("state") -> col("state"))

val insertMap_d = Map(col("id") -> col("id"),
 col("name") -> col("name"),
 col("c_name") -> lit("delete"),
 col("quantity") -> col("quantity"),
 col("price") -> expr("price"),
 col("state") -> col("state"))

matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
 matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
 matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))

 

 
{code:java}
sql("select count(*) from order").show()
sql("select count(*) from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count from order_hist where c_name = 'delete'").show()
sql("select count from order_hist where c_name = 'insert'").show() placeholder
{code}
 

sql("select count(*) from order").show()
 sql("select count(*) from order where state = 2").show()
 sql("select price from order where id = 'newid1'").show()
 sql("select count(*) from order_hist where c_name = 'delete'").show()
 sql("select count(*) from order_hist where c_name = 'insert'").show()

Results in spark-shell --master yarn
 scala> sql("select count(*) from order").show()
 +--------+
|count(1)|

+--------+
|10|

+--------+

scala> sql("select count(*) from order where state = 2").show()
 +--------+
|count(1)|

+--------+
|0|

+--------+

scala> sql("select price from order where id = 'newid1'").show()
 +-----+
|price|

+-----+
 +-----+

scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
 +--------+
|count(1)|

+--------+
|0|

+--------+

scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
 +--------+
|count(1)|

+--------+
|0|

+--------+

Results in spark-shell --master local

scala> sql("select count(*) from order").show()
 +--------+
|count(1)|

+--------+
|10|

+--------+

scala> sql("select count(*) from order where state = 2").show()
 +--------+
|count(1)|

+--------+
|2|

+--------+

scala> sql("select price from order where id = 'newid1'").show()
 +-----+
|price|

+-----+
|7500|

+-----+

scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
 +--------+
|count(1)|

+--------+
|2|

+--------+

scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
 +--------+
|count(1)|

+--------+
|2|

+--------+

  was:
The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

Steps to Reproduce Issue :

import scala.collection.JavaConverters._
 import java.sql.Date

import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.mutation.merge.

{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}

import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types.

{BooleanType, DateType, IntegerType, StringType, StructField, StructType}

import spark.implicits._

sql("drop table if exists order").show()
 sql("drop table if exists order_hist").show()
 sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()

val initframe = sc.parallelize(1 to 10, 4).map

{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}

.toDF("id", "name", "c_name", "quantity", "price", "state")

initframe.write
 .format("carbondata")
 .option("tableName", "order")
 .option("partitionColumns", "c_name")
 .mode(SaveMode.Overwrite)
 .save()

val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
 val dwSelframe = dwframe.as("A")

val ds1 = sc.parallelize(3 to 10, 4)
 .map {x =>
 if (x <= 4)

{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }

else

{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }

}.toDF("id", "name", "c_name", "quantity", "price", "state")

ds1.show()
 val ds2 = sc.parallelize(1 to 2, 4)
 .map

{x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }

.toDS().toDF()
 ds2.show()
 val ds3 = ds1.union(ds2)
 ds3.show()

val odsframe = ds3.as("B")

var matches = Seq.empty[MergeMatch]
 val updateMap = Map(col("id") -> col("A.id"),
 col("price") -> expr("B.price + 1"),
 col("state") -> col("B.state"))

val insertMap = Map(col("id") -> col("B.id"),
 col("name") -> col("B.name"),
 col("c_name") -> col("B.c_name"),
 col("quantity") -> col("B.quantity"),
 col("price") -> expr("B.price * 100"),
 col("state") -> col("B.state"))

val insertMap_u = Map(col("id") -> col("id"),
 col("name") -> col("name"),
 col("c_name") -> lit("insert"),
 col("quantity") -> col("quantity"),
 col("price") -> expr("price"),
 col("state") -> col("state"))

val insertMap_d = Map(col("id") -> col("id"),
 col("name") -> col("name"),
 col("c_name") -> lit("delete"),
 col("quantity") -> col("quantity"),
 col("price") -> expr("price"),
 col("state") -> col("state"))

matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
 matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
 matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))

 

 
{code:java}
sql("select count from order").show()
sql("select count from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count from order_hist where c_name = 'delete'").show()
sql("select count from order_hist where c_name = 'insert'").show() placeholder
{code}
 

sql("select count(*) from order").show()
 sql("select count(*) from order where state = 2").show()
 sql("select price from order where id = 'newid1'").show()
 sql("select count(*) from order_hist where c_name = 'delete'").show()
 sql("select count(*) from order_hist where c_name = 'insert'").show()

Results in spark-shell --master yarn
 scala> sql("select count(*) from order").show()
 +--------+
|count(1)|

+--------+
|10|

+--------+

scala> sql("select count(*) from order where state = 2").show()
 +--------+
|count(1)|

+--------+
|0|

+--------+

scala> sql("select price from order where id = 'newid1'").show()
 +-----+
|price|

+-----+
 +-----+

scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
 +--------+
|count(1)|

+--------+
|0|

+--------+

scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
 +--------+
|count(1)|

+--------+
|0|

+--------+

Results in spark-shell --master local

scala> sql("select count(*) from order").show()
 +--------+
|count(1)|

+--------+
|10|

+--------+

scala> sql("select count(*) from order where state = 2").show()
 +--------+
|count(1)|

+--------+
|2|

+--------+

scala> sql("select price from order where id = 'newid1'").show()
 +-----+
|price|

+-----+
|7500|

+-----+

scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
 +--------+
|count(1)|

+--------+
|2|

+--------+

scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
 +--------+
|count(1)|

+--------+
|2|

+--------+


> Merge Update and Insert with Partition Table is giving different results in different spark deploy modes
> --------------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-3851
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3851
>             Project: CarbonData
>          Issue Type: Bug
>          Components: spark-integration
>    Affects Versions: 2.0.0
>            Reporter: Sachin Ramachandra Setty
>            Priority: Major
>
> The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> Steps to Reproduce Issue :
> import scala.collection.JavaConverters._
>  import java.sql.Date
> import org.apache.spark.sql._
>  import org.apache.spark.sql.CarbonSession._
>  import org.apache.spark.sql.catalyst.TableIdentifier
>  import org.apache.spark.sql.execution.command.mutation.merge.
> {CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
>  import org.apache.spark.sql.test.util.QueryTest
>  import org.apache.spark.sql.types.
> {BooleanType, DateType, IntegerType, StringType, StructField, StructType}
> import spark.implicits._
> sql("drop table if exists order").show()
>  sql("drop table if exists order_hist").show()
>  sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
> val initframe = sc.parallelize(1 to 10, 4).map
> { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
> .toDF("id", "name", "c_name", "quantity", "price", "state")
> initframe.write
>  .format("carbondata")
>  .option("tableName", "order")
>  .option("partitionColumns", "c_name")
>  .mode(SaveMode.Overwrite)
>  .save()
> val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
>  val dwSelframe = dwframe.as("A")
> val ds1 = sc.parallelize(3 to 10, 4)
>  .map {x =>
>  if (x <= 4)
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
> else
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> }.toDF("id", "name", "c_name", "quantity", "price", "state")
> ds1.show()
>  val ds2 = sc.parallelize(1 to 2, 4)
>  .map
> {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> .toDS().toDF()
>  ds2.show()
>  val ds3 = ds1.union(ds2)
>  ds3.show()
> val odsframe = ds3.as("B")
> var matches = Seq.empty[MergeMatch]
>  val updateMap = Map(col("id") -> col("A.id"),
>  col("price") -> expr("B.price + 1"),
>  col("state") -> col("B.state"))
> val insertMap = Map(col("id") -> col("B.id"),
>  col("name") -> col("B.name"),
>  col("c_name") -> col("B.c_name"),
>  col("quantity") -> col("B.quantity"),
>  col("price") -> expr("B.price * 100"),
>  col("state") -> col("B.state"))
> val insertMap_u = Map(col("id") -> col("id"),
>  col("name") -> col("name"),
>  col("c_name") -> lit("insert"),
>  col("quantity") -> col("quantity"),
>  col("price") -> expr("price"),
>  col("state") -> col("state"))
> val insertMap_d = Map(col("id") -> col("id"),
>  col("name") -> col("name"),
>  col("c_name") -> lit("delete"),
>  col("quantity") -> col("quantity"),
>  col("price") -> expr("price"),
>  col("state") -> col("state"))
> matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
>  matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
>  matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))
>  
>  
> {code:java}
> sql("select count(*) from order").show()
> sql("select count(*) from order where state = 2").show()
> sql("select price from order where id = 'newid1'").show()
> sql("select count from order_hist where c_name = 'delete'").show()
> sql("select count from order_hist where c_name = 'insert'").show() placeholder
> {code}
>  
> sql("select count(*) from order").show()
>  sql("select count(*) from order where state = 2").show()
>  sql("select price from order where id = 'newid1'").show()
>  sql("select count(*) from order_hist where c_name = 'delete'").show()
>  sql("select count(*) from order_hist where c_name = 'insert'").show()
> Results in spark-shell --master yarn
>  scala> sql("select count(*) from order").show()
>  +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
>  +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
>  +-----+
> |price|
> +-----+
>  +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
>  +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
>  +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> Results in spark-shell --master local
> scala> sql("select count(*) from order").show()
>  +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
>  +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
>  +-----+
> |price|
> +-----+
> |7500|
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
>  +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
>  +--------+
> |count(1)|
> +--------+
> |2|
> +--------+



--
This message was sent by Atlassian Jira
(v8.3.4#803005)