[ https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sachin Ramachandra Setty closed CARBONDATA-3851. ------------------------------------------------ This PR resolved this issue. https://github.com/apache/carbondata/pull/3835 > Merge Update and Insert with Partition Table is giving different results in different spark deploy modes > -------------------------------------------------------------------------------------------------------- > > Key: CARBONDATA-3851 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3851 > Project: CarbonData > Issue Type: Bug > Components: spark-integration > Affects Versions: 2.0.0 > Reporter: Sachin Ramachandra Setty > Priority: Major > Fix For: 2.1.0 > > > The result sets are different when run the queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes) > Steps to Reproduce Issue : > {code} > import scala.collection.JavaConverters._ > import java.sql.Date > import org.apache.spark.sql._ > import org.apache.spark.sql.CarbonSession._ > import org.apache.spark.sql.catalyst.TableIdentifier > import org.apache.spark.sql.execution.command.mutation.merge. > {CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.test.util.QueryTest > import org.apache.spark.sql.types. > {BooleanType, DateType, IntegerType, StringType, StructField, StructType} > import spark.implicits._ > sql("drop table if exists order").show() > sql("drop table if exists order_hist").show() > sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show() > val initframe = sc.parallelize(1 to 10, 4).map > { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)} > .toDF("id", "name", "c_name", "quantity", "price", "state") > initframe.write > .format("carbondata") > .option("tableName", "order") > .option("partitionColumns", "c_name") > .mode(SaveMode.Overwrite) > .save() > val dwframe = spark.read.format("carbondata").option("tableName", "order").load() > val dwSelframe = dwframe.as("A") > val ds1 = sc.parallelize(3 to 10, 4) > .map {x => > if (x <= 4) > { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) } > else > { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) } > }.toDF("id", "name", "c_name", "quantity", "price", "state") > ds1.show() > val ds2 = sc.parallelize(1 to 2, 4) > .map > {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) } > .toDS().toDF() > ds2.show() > val ds3 = ds1.union(ds2) > ds3.show() > val odsframe = ds3.as("B") > var matches = Seq.empty[MergeMatch] > val updateMap = Map(col("id") -> col("A.id"), > col("price") -> expr("B.price + 1"), > col("state") -> col("B.state")) > val insertMap = Map(col("id") -> col("B.id"), > col("name") -> col("B.name"), > col("c_name") -> col("B.c_name"), > col("quantity") -> col("B.quantity"), > col("price") -> expr("B.price * 100"), > col("state") -> col("B.state")) > val insertMap_u = Map(col("id") -> col("id"), > col("name") -> col("name"), > col("c_name") -> lit("insert"), > col("quantity") -> col("quantity"), > col("price") -> expr("price"), > col("state") -> col("state")) > val insertMap_d = Map(col("id") -> col("id"), > col("name") -> col("name"), > col("c_name") -> lit("delete"), > col("quantity") -> col("quantity"), > col("price") -> expr("price"), > col("state") -> col("state")) > matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist")))) > matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) > matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist")))) > {code} > > SQL Queries : > {code} > sql("select count(*) from order").show() > sql("select count(*) from order where state = 2").show() > sql("select price from order where id = 'newid1'").show() > sql("select count(*) from order_hist where c_name = 'delete'").show() > sql("select count(*) from order_hist where c_name = 'insert'").show() > {code} > Results in spark-shell --master yarn > {code} > scala> sql("select count(*) from order").show() > +--------+ > |count(1)| > +--------+ > |10| > +--------+ > scala> sql("select count(*) from order where state = 2").show() > +--------+ > |count(1)| > +--------+ > |0| > +--------+ > scala> sql("select price from order where id = 'newid1'").show() > +-----+ > |price| > +-----+ > +-----+ > scala> sql("select count(*) from order_hist where c_name = 'delete'").show() > +--------+ > |count(1)| > +--------+ > |0| > +--------+ > scala> sql("select count(*) from order_hist where c_name = 'insert'").show() > +--------+ > |count(1)| > +--------+ > |0| > +--------+ > {code} > Results in spark-shell --master local > {code} > scala> sql("select count(*) from order").show() > +--------+ > |count(1)| > +--------+ > |10| > +--------+ > scala> sql("select count(*) from order where state = 2").show() > +--------+ > |count(1)| > +--------+ > |2| > +--------+ > scala> sql("select price from order where id = 'newid1'").show() > +-----+ > |price| > +-----+ > |7500| > +-----+ > scala> sql("select count(*) from order_hist where c_name = 'delete'").show() > +--------+ > |count(1)| > +--------+ > |2| > +--------+ > scala> sql("select count(*) from order_hist where c_name = 'insert'").show() > +--------+ > |count(1)| > +--------+ > |2| > +--------+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |