[jira] [Updated] (CARBONDATA-3851) Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Updated] (CARBONDATA-3851) Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

Akash R Nilugal (Jira)

     [ https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sachin Ramachandra Setty updated CARBONDATA-3851:
-------------------------------------------------
    Description:
The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

Steps to Reproduce Issue :

    import scala.collection.JavaConverters._
    import java.sql.Date
   
    import org.apache.spark.sql._
    import org.apache.spark.sql.CarbonSession._
    import org.apache.spark.sql.catalyst.TableIdentifier
    import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.test.util.QueryTest
    import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
    import spark.implicits._
       
   sql("drop table if exists order").show()
    sql("drop table if exists order_hist").show()
    sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
       
        val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")
       
    initframe.write
      .format("carbondata")
      .option("tableName", "order")
      .option("partitionColumns", "c_name")
      .mode(SaveMode.Overwrite)
      .save()

    val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
    val dwSelframe = dwframe.as("A")

       
        val ds1 = sc.parallelize(3 to 10, 4)
      .map {x =>
        if (x <= 4) {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
        } else {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
        }
      }.toDF("id", "name", "c_name", "quantity", "price", "state")
         
    ds1.show()
    val ds2 = sc.parallelize(1 to 2, 4)
      .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)
      }.toDS().toDF()
    ds2.show()
    val ds3 = ds1.union(ds2)
        ds3.show()
       
val odsframe = ds3.as("B")


   var matches = Seq.empty[MergeMatch]
   val updateMap = Map(col("id") -> col("A.id"),
      col("price") -> expr("B.price + 1"),
      col("state") -> col("B.state"))

    val insertMap = Map(col("id") -> col("B.id"),
      col("name") -> col("B.name"),
      col("c_name") -> col("B.c_name"),
      col("quantity") -> col("B.quantity"),
      col("price") -> expr("B.price * 100"),
      col("state") -> col("B.state"))

    val insertMap_u = Map(col("id") -> col("id"),
      col("name") -> col("name"),
      col("c_name") -> lit("insert"),
      col("quantity") -> col("quantity"),
      col("price") -> expr("price"),
      col("state") -> col("state"))

    val insertMap_d = Map(col("id") -> col("id"),
      col("name") -> col("name"),
      col("c_name") -> lit("delete"),
      col("quantity") -> col("quantity"),
      col("price") -> expr("price"),
      col("state") -> col("state"))

   matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
   matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
   matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))











 

  was:
The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

Steps to Reproduce Issue :

    import scala.collection.JavaConverters._
    import java.sql.Date
   
    import org.apache.spark.sql._
    import org.apache.spark.sql.CarbonSession._
    import org.apache.spark.sql.catalyst.TableIdentifier
    import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.test.util.QueryTest
    import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
    import spark.implicits._
       
   sql("drop table if exists order").show()
    sql("drop table if exists order_hist").show()
    sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
       
        val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")
       
    initframe.write
      .format("carbondata")
      .option("tableName", "order")
      .option("partitionColumns", "c_name")
      .mode(SaveMode.Overwrite)
      .save()

    val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
    val dwSelframe = dwframe.as("A")

       
        val ds1 = sc.parallelize(3 to 10, 4)
      .map {x =>
        if (x <= 4) {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
        } else {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
        }
      }.toDF("id", "name", "c_name", "quantity", "price", "state")
         
    ds1.show()
    val ds2 = sc.parallelize(1 to 2, 4)
      .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)
      }.toDS().toDF()
    ds2.show()
    val ds3 = ds1.union(ds2)
        ds3.show()
       
val odsframe = ds3.as("B")


   var matches = Seq.empty[MergeMatch]
   val updateMap = Map(col("id") -> col("A.id"),
      col("price") -> expr("B.price + 1"),
      col("state") -> col("B.state"))

    val insertMap = Map(col("id") -> col("B.id"),
      col("name") -> col("B.name"),
      col("c_name") -> col("B.c_name"),
      col("quantity") -> col("B.quantity"),
      col("price") -> expr("B.price * 100"),
      col("state") -> col("B.state"))

    val insertMap_u = Map(col("id") -> col("id"),
      col("name") -> col("name"),
      col("c_name") -> lit("insert"),
      col("quantity") -> col("quantity"),
      col("price") -> expr("price"),
      col("state") -> col("state"))

    val insertMap_d = Map(col("id") -> col("id"),
      col("name") -> col("name"),
      col("c_name") -> lit("delete"),
      col("quantity") -> col("quantity"),
      col("price") -> expr("price"),
      col("state") -> col("state"))

   matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
   matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
   matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))

sql("select count(*) from order").show()
sql("select count(*) from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count(*) from order_hist where c_name = 'delete'").show()
sql("select count(*) from order_hist where c_name = 'insert'").show()


*Results in spark-shell --master yarn *

scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|      10|
+--------+


scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+


scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
+-----+


scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+


scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+


*Results in spark-shell --master local*

 scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|      10|
+--------+


scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|       2|
+--------+


scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
| 7500|
+-----+


scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|       2|
+--------+


scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|       2|
+--------+


 


> Merge Update and Insert with Partition Table is giving different results in different spark deploy modes
> --------------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-3851
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3851
>             Project: CarbonData
>          Issue Type: Bug
>          Components: spark-integration
>    Affects Versions: 2.0.0
>            Reporter: Sachin Ramachandra Setty
>            Priority: Major
>
> The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> Steps to Reproduce Issue :
>     import scala.collection.JavaConverters._
>     import java.sql.Date
>    
>     import org.apache.spark.sql._
>     import org.apache.spark.sql.CarbonSession._
>     import org.apache.spark.sql.catalyst.TableIdentifier
>     import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
>     import org.apache.spark.sql.functions._
>     import org.apache.spark.sql.test.util.QueryTest
>     import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
>     import spark.implicits._
>
>    sql("drop table if exists order").show()
>     sql("drop table if exists order_hist").show()
>     sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
>
> val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")
>
>     initframe.write
>       .format("carbondata")
>       .option("tableName", "order")
>       .option("partitionColumns", "c_name")
>       .mode(SaveMode.Overwrite)
>       .save()
>     val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
>     val dwSelframe = dwframe.as("A")
>
> val ds1 = sc.parallelize(3 to 10, 4)
>       .map {x =>
>         if (x <= 4) {
>           ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
>         } else {
>           ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
>         }
>       }.toDF("id", "name", "c_name", "quantity", "price", "state")
>  
>     ds1.show()
>     val ds2 = sc.parallelize(1 to 2, 4)
>       .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)
>       }.toDS().toDF()
>     ds2.show()
>     val ds3 = ds1.union(ds2)
> ds3.show()
>
> val odsframe = ds3.as("B")
>    var matches = Seq.empty[MergeMatch]
>    val updateMap = Map(col("id") -> col("A.id"),
>       col("price") -> expr("B.price + 1"),
>       col("state") -> col("B.state"))
>     val insertMap = Map(col("id") -> col("B.id"),
>       col("name") -> col("B.name"),
>       col("c_name") -> col("B.c_name"),
>       col("quantity") -> col("B.quantity"),
>       col("price") -> expr("B.price * 100"),
>       col("state") -> col("B.state"))
>     val insertMap_u = Map(col("id") -> col("id"),
>       col("name") -> col("name"),
>       col("c_name") -> lit("insert"),
>       col("quantity") -> col("quantity"),
>       col("price") -> expr("price"),
>       col("state") -> col("state"))
>     val insertMap_d = Map(col("id") -> col("id"),
>       col("name") -> col("name"),
>       col("c_name") -> lit("delete"),
>       col("quantity") -> col("quantity"),
>       col("price") -> expr("price"),
>       col("state") -> col("state"))
>    matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
>    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
>    matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)