[jira] [Updated] (CARBONDATA-3852) CCD Merge with Partition Table is giving different results in different spark deploy modes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Updated] (CARBONDATA-3852) CCD Merge with Partition Table is giving different results in different spark deploy modes

Akash R Nilugal (Jira)

     [ https://issues.apache.org/jira/browse/CARBONDATA-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sachin Ramachandra Setty updated CARBONDATA-3852:
-------------------------------------------------
    Description:
The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

{code}
import scala.collection.JavaConverters._
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}

import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
import spark.implicits._

val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")


df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save()
val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
val dwSelframe = dwframe.as("A")


val ds1 = sc.parallelize(3 to 10, 4)
      .map {x =>
        if (x <= 4) {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
        } else {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
        }
      }.toDF("id", "name", "c_name", "quantity", "price", "state")
         

val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
val ds3 = ds1.union(ds2)
val odsframe = ds3.as("B")
       
sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
  Row("a", "0"),
  Row("b", "1"),
  Row("c", "2"),
  Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))

initframe.write
  .format("carbondata")
  .option("tableName", "target")
  .option("partitionColumns", "value")
  .mode(SaveMode.Overwrite)
  .save()
 
val target = spark.read.format("carbondata").option("tableName", "target").load()

var ccd =
  spark.createDataFrame(Seq(
    Row("a", "10", false,  0),
    Row("a", null, true, 1),  
    Row("b", null, true, 2),  
    Row("c", null, true, 3),  
    Row("c", "20", false, 4),
    Row("c", "200", false, 5),
    Row("e", "100", false, 6)
  ).asJava,
    StructType(Seq(StructField("key", StringType),
      StructField("newValue", StringType),
      StructField("deleted", BooleanType), StructField("time", IntegerType))))
         
ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]

val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
  whenMatched("B.deleted=false").
  updateExpr(updateMap).
  whenNotMatched("B.deleted=false").
  insertExpr(insertMap).
  whenMatched("B.deleted=true").
  delete().execute()
 
{code}

SQL Queries to run :

{code}
sql("select count(*) from target").show()
sql("select * from target order by key").show()
{code}



Results in spark-shell --master yarn

{code}
scala> sql("select count(*) from target").show()
+--------+
|count(1)|
+--------+
|       4|
+--------+


scala> sql("select * from target order by key").show()
+---+-----+
|key|value|
+---+-----+
|  a|    0|
|  b|    1|
|  c|    2|
|  d|    3|
+---+-----+
{code}


Results in spark-shell --master local

{code}
scala> sql("select count(*) from target").show()
+--------+
|count(1)|
+--------+
|       3|
+--------+


scala> sql("select * from target order by key").show()
+---+-----+
|key|value|
+---+-----+
|  c|  200|
|  d|    3|
|  e|  100|
+---+-----+
{code}




  was:
The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

{code}
import scala.collection.JavaConverters._
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}

import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
import spark.implicits._

val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")


df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save()
val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
val dwSelframe = dwframe.as("A")


val ds1 = sc.parallelize(3 to 10, 4)
      .map {x =>
        if (x <= 4) {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
        } else {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
        }
      }.toDF("id", "name", "c_name", "quantity", "price", "state")
         

val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
val ds3 = ds1.union(ds2)
val odsframe = ds3.as("B")
       
sql("drop table if exists target").show()

val initframe = spark.createDataFrame(Seq(
  Row("a", "0"),
  Row("b", "1"),
  Row("c", "2"),
  Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))

initframe.write
  .format("carbondata")
  .option("tableName", "target")
  .option("partitionColumns", "value")
  .mode(SaveMode.Overwrite)
  .save()
 
val target = spark.read.format("carbondata").option("tableName", "target").load()

var ccd =
  spark.createDataFrame(Seq(
    Row("a", "10", false,  0),
    Row("a", null, true, 1),  
    Row("b", null, true, 2),  
    Row("c", null, true, 3),  
    Row("c", "20", false, 4),
    Row("c", "200", false, 5),
    Row("e", "100", false, 6)
  ).asJava,
    StructType(Seq(StructField("key", StringType),
      StructField("newValue", StringType),
      StructField("deleted", BooleanType), StructField("time", IntegerType))))
         
ccd.createOrReplaceTempView("changes")

ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")

val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]

val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]

target.as("A").merge(ccd.as("B"), "A.key=B.key").
  whenMatched("B.deleted=false").
  updateExpr(updateMap).
  whenNotMatched("B.deleted=false").
  insertExpr(insertMap).
  whenMatched("B.deleted=true").
  delete().execute()
 
{code}


> CCD Merge with Partition Table is giving different results in different spark deploy modes
> ------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-3852
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3852
>             Project: CarbonData
>          Issue Type: Bug
>          Components: spark-integration
>    Affects Versions: 2.0.0
>            Reporter: Sachin Ramachandra Setty
>            Priority: Major
>
> The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> {code}
> import scala.collection.JavaConverters._
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
> import spark.implicits._
> val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")
> df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save()
> val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
> val dwSelframe = dwframe.as("A")
> val ds1 = sc.parallelize(3 to 10, 4)
>       .map {x =>
>         if (x <= 4) {
>           ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
>         } else {
>           ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
>         }
>       }.toDF("id", "name", "c_name", "quantity", "price", "state")
>  
> val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
> val ds3 = ds1.union(ds2)
> val odsframe = ds3.as("B")
>
> sql("drop table if exists target").show()
> val initframe = spark.createDataFrame(Seq(
>   Row("a", "0"),
>   Row("b", "1"),
>   Row("c", "2"),
>   Row("d", "3")
> ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
> initframe.write
>   .format("carbondata")
>   .option("tableName", "target")
>   .option("partitionColumns", "value")
>   .mode(SaveMode.Overwrite)
>   .save()
>  
> val target = spark.read.format("carbondata").option("tableName", "target").load()
> var ccd =
>   spark.createDataFrame(Seq(
>     Row("a", "10", false,  0),
>     Row("a", null, true, 1),  
>     Row("b", null, true, 2),  
>     Row("c", null, true, 3),  
>     Row("c", "20", false, 4),
>     Row("c", "200", false, 5),
>     Row("e", "100", false, 6)
>   ).asJava,
>     StructType(Seq(StructField("key", StringType),
>       StructField("newValue", StringType),
>       StructField("deleted", BooleanType), StructField("time", IntegerType))))
>  
> ccd.createOrReplaceTempView("changes")
> ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
> val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]
> val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]
> target.as("A").merge(ccd.as("B"), "A.key=B.key").
>   whenMatched("B.deleted=false").
>   updateExpr(updateMap).
>   whenNotMatched("B.deleted=false").
>   insertExpr(insertMap).
>   whenMatched("B.deleted=true").
>   delete().execute()
>  
> {code}
> SQL Queries to run :
> {code}
> sql("select count(*) from target").show()
> sql("select * from target order by key").show()
> {code}
> Results in spark-shell --master yarn
> {code}
> scala> sql("select count(*) from target").show()
> +--------+
> |count(1)|
> +--------+
> |       4|
> +--------+
> scala> sql("select * from target order by key").show()
> +---+-----+
> |key|value|
> +---+-----+
> |  a|    0|
> |  b|    1|
> |  c|    2|
> |  d|    3|
> +---+-----+
> {code}
> Results in spark-shell --master local
> {code}
> scala> sql("select count(*) from target").show()
> +--------+
> |count(1)|
> +--------+
> |       3|
> +--------+
> scala> sql("select * from target order by key").show()
> +---+-----+
> |key|value|
> +---+-----+
> |  c|  200|
> |  d|    3|
> |  e|  100|
> +---+-----+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)