[ https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sachin Ramachandra Setty updated CARBONDATA-3851: ------------------------------------------------- Description: The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes) Steps to Reproduce Issue : import scala.collection.JavaConverters._ import java.sql.Date import org.apache.spark.sql._ import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} import spark.implicits._ sql("drop table if exists order").show() sql("drop table if exists order_hist").show() sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show() val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state") initframe.write .format("carbondata") .option("tableName", "order") .option("partitionColumns", "c_name") .mode(SaveMode.Overwrite) .save() val dwframe = spark.read.format("carbondata").option("tableName", "order").load() val dwSelframe = dwframe.as("A") val ds1 = sc.parallelize(3 to 10, 4) .map {x => if (x <= 4) { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) } else { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) } }.toDF("id", "name", "c_name", "quantity", "price", "state") ds1.show() val ds2 = sc.parallelize(1 to 2, 4) .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }.toDS().toDF() ds2.show() val ds3 = ds1.union(ds2) ds3.show() val odsframe = ds3.as("B") var matches = Seq.empty[MergeMatch] val updateMap = Map(col("id") -> col("A.id"), col("price") -> expr("B.price + 1"), col("state") -> col("B.state")) val insertMap = Map(col("id") -> col("B.id"), col("name") -> col("B.name"), col("c_name") -> col("B.c_name"), col("quantity") -> col("B.quantity"), col("price") -> expr("B.price * 100"), col("state") -> col("B.state")) val insertMap_u = Map(col("id") -> col("id"), col("name") -> col("name"), col("c_name") -> lit("insert"), col("quantity") -> col("quantity"), col("price") -> expr("price"), col("state") -> col("state")) val insertMap_d = Map(col("id") -> col("id"), col("name") -> col("name"), col("c_name") -> lit("delete"), col("quantity") -> col("quantity"), col("price") -> expr("price"), col("state") -> col("state")) matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist")))) matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist")))) sql("select count(*) from order").show() sql("select count(*) from order where state = 2").show() sql("select price from order where id = 'newid1'").show() sql("select count(*) from order_hist where c_name = 'delete'").show() sql("select count(*) from order_hist where c_name = 'insert'").show() *Results in spark-shell --master yarn * scala> sql("select count(*) from order").show() +--------+ |count(1)| +--------+ | 10| +--------+ scala> sql("select count(*) from order where state = 2").show() +--------+ |count(1)| +--------+ | 0| +--------+ scala> sql("select price from order where id = 'newid1'").show() +-----+ |price| +-----+ +-----+ scala> sql("select count(*) from order_hist where c_name = 'delete'").show() +--------+ |count(1)| +--------+ | 0| +--------+ scala> sql("select count(*) from order_hist where c_name = 'insert'").show() +--------+ |count(1)| +--------+ | 0| +--------+ *Results in spark-shell --master local* scala> sql("select count(*) from order").show() +--------+ |count(1)| +--------+ | 10| +--------+ scala> sql("select count(*) from order where state = 2").show() +--------+ |count(1)| +--------+ | 2| +--------+ scala> sql("select price from order where id = 'newid1'").show() +-----+ |price| +-----+ | 7500| +-----+ scala> sql("select count(*) from order_hist where c_name = 'delete'").show() +--------+ |count(1)| +--------+ | 2| +--------+ scala> sql("select count(*) from order_hist where c_name = 'insert'").show() +--------+ |count(1)| +--------+ | 2| +--------+ was:The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes) > Merge Update and Insert with Partition Table is giving different results in different spark deploy modes > -------------------------------------------------------------------------------------------------------- > > Key: CARBONDATA-3851 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3851 > Project: CarbonData > Issue Type: Bug > Components: spark-integration > Affects Versions: 2.0.0 > Reporter: Sachin Ramachandra Setty > Priority: Major > > The result sets are different when run the queries in spark-shell--master local and spark-shell --master yarn (Two Different Spark Deploy Modes) > Steps to Reproduce Issue : > import scala.collection.JavaConverters._ > import java.sql.Date > > import org.apache.spark.sql._ > import org.apache.spark.sql.CarbonSession._ > import org.apache.spark.sql.catalyst.TableIdentifier > import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.test.util.QueryTest > import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} > import spark.implicits._ > > sql("drop table if exists order").show() > sql("drop table if exists order_hist").show() > sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show() > > val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state") > > initframe.write > .format("carbondata") > .option("tableName", "order") > .option("partitionColumns", "c_name") > .mode(SaveMode.Overwrite) > .save() > val dwframe = spark.read.format("carbondata").option("tableName", "order").load() > val dwSelframe = dwframe.as("A") > > val ds1 = sc.parallelize(3 to 10, 4) > .map {x => > if (x <= 4) { > ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) > } else { > ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) > } > }.toDF("id", "name", "c_name", "quantity", "price", "state") > > ds1.show() > val ds2 = sc.parallelize(1 to 2, 4) > .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) > }.toDS().toDF() > ds2.show() > val ds3 = ds1.union(ds2) > ds3.show() > > val odsframe = ds3.as("B") > var matches = Seq.empty[MergeMatch] > val updateMap = Map(col("id") -> col("A.id"), > col("price") -> expr("B.price + 1"), > col("state") -> col("B.state")) > val insertMap = Map(col("id") -> col("B.id"), > col("name") -> col("B.name"), > col("c_name") -> col("B.c_name"), > col("quantity") -> col("B.quantity"), > col("price") -> expr("B.price * 100"), > col("state") -> col("B.state")) > val insertMap_u = Map(col("id") -> col("id"), > col("name") -> col("name"), > col("c_name") -> lit("insert"), > col("quantity") -> col("quantity"), > col("price") -> expr("price"), > col("state") -> col("state")) > val insertMap_d = Map(col("id") -> col("id"), > col("name") -> col("name"), > col("c_name") -> lit("delete"), > col("quantity") -> col("quantity"), > col("price") -> expr("price"), > col("state") -> col("state")) > matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist")))) > matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) > matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist")))) > sql("select count(*) from order").show() > sql("select count(*) from order where state = 2").show() > sql("select price from order where id = 'newid1'").show() > sql("select count(*) from order_hist where c_name = 'delete'").show() > sql("select count(*) from order_hist where c_name = 'insert'").show() > *Results in spark-shell --master yarn * > scala> sql("select count(*) from order").show() > +--------+ > |count(1)| > +--------+ > | 10| > +--------+ > scala> sql("select count(*) from order where state = 2").show() > +--------+ > |count(1)| > +--------+ > | 0| > +--------+ > scala> sql("select price from order where id = 'newid1'").show() > +-----+ > |price| > +-----+ > +-----+ > scala> sql("select count(*) from order_hist where c_name = 'delete'").show() > +--------+ > |count(1)| > +--------+ > | 0| > +--------+ > scala> sql("select count(*) from order_hist where c_name = 'insert'").show() > +--------+ > |count(1)| > +--------+ > | 0| > +--------+ > *Results in spark-shell --master local* > scala> sql("select count(*) from order").show() > +--------+ > |count(1)| > +--------+ > | 10| > +--------+ > scala> sql("select count(*) from order where state = 2").show() > +--------+ > |count(1)| > +--------+ > | 2| > +--------+ > scala> sql("select price from order where id = 'newid1'").show() > +-----+ > |price| > +-----+ > | 7500| > +-----+ > scala> sql("select count(*) from order_hist where c_name = 'delete'").show() > +--------+ > |count(1)| > +--------+ > | 2| > +--------+ > scala> sql("select count(*) from order_hist where c_name = 'insert'").show() > +--------+ > |count(1)| > +--------+ > | 2| > +--------+ > -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |