[ https://issues.apache.org/jira/browse/CARBONDATA-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sachin Ramachandra Setty updated CARBONDATA-3852: ------------------------------------------------- Description: The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes) {code} import scala.collection.JavaConverters._ import java.sql.Date import org.apache.spark.sql._ import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} import spark.implicits._ val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state") df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save() val dwframe = spark.read.format("carbondata").option("tableName", "order").load() val dwSelframe = dwframe.as("A") val ds1 = sc.parallelize(3 to 10, 4) .map {x => if (x <= 4) { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) } else { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) } }.toDF("id", "name", "c_name", "quantity", "price", "state") val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF() val ds3 = ds1.union(ds2) val odsframe = ds3.as("B") sql("drop table if exists target").show() val initframe = spark.createDataFrame(Seq( Row("a", "0"), Row("b", "1"), Row("c", "2"), Row("d", "3") ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) initframe.write .format("carbondata") .option("tableName", "target") .option("partitionColumns", "value") .mode(SaveMode.Overwrite) .save() val target = spark.read.format("carbondata").option("tableName", "target").load() var ccd = spark.createDataFrame(Seq( Row("a", "10", false, 0), Row("a", null, true, 1), Row("b", null, true, 2), Row("c", null, true, 3), Row("c", "20", false, 4), Row("c", "200", false, 5), Row("e", "100", false, 6) ).asJava, StructType(Seq(StructField("key", StringType), StructField("newValue", StringType), StructField("deleted", BooleanType), StructField("time", IntegerType)))) ccd.createOrReplaceTempView("changes") ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)") val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] target.as("A").merge(ccd.as("B"), "A.key=B.key"). whenMatched("B.deleted=false"). updateExpr(updateMap). whenNotMatched("B.deleted=false"). insertExpr(insertMap). whenMatched("B.deleted=true"). delete().execute() {code} SQL Queries to run : {code} sql("select count(*) from target").show() sql("select * from target order by key").show() {code} Results in spark-shell --master yarn {code} scala> sql("select count(*) from target").show() +--------+ |count(1)| +--------+ | 4| +--------+ scala> sql("select * from target order by key").show() +---+-----+ |key|value| +---+-----+ | a| 0| | b| 1| | c| 2| | d| 3| +---+-----+ {code} Results in spark-shell --master local {code} scala> sql("select count(*) from target").show() +--------+ |count(1)| +--------+ | 3| +--------+ scala> sql("select * from target order by key").show() +---+-----+ |key|value| +---+-----+ | c| 200| | d| 3| | e| 100| +---+-----+ {code} was: The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes) {code} import scala.collection.JavaConverters._ import java.sql.Date import org.apache.spark.sql._ import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} import spark.implicits._ val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state") df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save() val dwframe = spark.read.format("carbondata").option("tableName", "order").load() val dwSelframe = dwframe.as("A") val ds1 = sc.parallelize(3 to 10, 4) .map {x => if (x <= 4) { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) } else { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) } }.toDF("id", "name", "c_name", "quantity", "price", "state") val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF() val ds3 = ds1.union(ds2) val odsframe = ds3.as("B") sql("drop table if exists target").show() val initframe = spark.createDataFrame(Seq( Row("a", "0"), Row("b", "1"), Row("c", "2"), Row("d", "3") ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) initframe.write .format("carbondata") .option("tableName", "target") .option("partitionColumns", "value") .mode(SaveMode.Overwrite) .save() val target = spark.read.format("carbondata").option("tableName", "target").load() var ccd = spark.createDataFrame(Seq( Row("a", "10", false, 0), Row("a", null, true, 1), Row("b", null, true, 2), Row("c", null, true, 3), Row("c", "20", false, 4), Row("c", "200", false, 5), Row("e", "100", false, 6) ).asJava, StructType(Seq(StructField("key", StringType), StructField("newValue", StringType), StructField("deleted", BooleanType), StructField("time", IntegerType)))) ccd.createOrReplaceTempView("changes") ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)") val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] target.as("A").merge(ccd.as("B"), "A.key=B.key"). whenMatched("B.deleted=false"). updateExpr(updateMap). whenNotMatched("B.deleted=false"). insertExpr(insertMap). whenMatched("B.deleted=true"). delete().execute() {code} > CCD Merge with Partition Table is giving different results in different spark deploy modes > ------------------------------------------------------------------------------------------ > > Key: CARBONDATA-3852 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3852 > Project: CarbonData > Issue Type: Bug > Components: spark-integration > Affects Versions: 2.0.0 > Reporter: Sachin Ramachandra Setty > Priority: Major > > The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes) > {code} > import scala.collection.JavaConverters._ > import java.sql.Date > import org.apache.spark.sql._ > import org.apache.spark.sql.CarbonSession._ > import org.apache.spark.sql.catalyst.TableIdentifier > import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.test.util.QueryTest > import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} > import spark.implicits._ > val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state") > df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save() > val dwframe = spark.read.format("carbondata").option("tableName", "order").load() > val dwSelframe = dwframe.as("A") > val ds1 = sc.parallelize(3 to 10, 4) > .map {x => > if (x <= 4) { > ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) > } else { > ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) > } > }.toDF("id", "name", "c_name", "quantity", "price", "state") > > val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF() > val ds3 = ds1.union(ds2) > val odsframe = ds3.as("B") > > sql("drop table if exists target").show() > val initframe = spark.createDataFrame(Seq( > Row("a", "0"), > Row("b", "1"), > Row("c", "2"), > Row("d", "3") > ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) > initframe.write > .format("carbondata") > .option("tableName", "target") > .option("partitionColumns", "value") > .mode(SaveMode.Overwrite) > .save() > > val target = spark.read.format("carbondata").option("tableName", "target").load() > var ccd = > spark.createDataFrame(Seq( > Row("a", "10", false, 0), > Row("a", null, true, 1), > Row("b", null, true, 2), > Row("c", null, true, 3), > Row("c", "20", false, 4), > Row("c", "200", false, 5), > Row("e", "100", false, 6) > ).asJava, > StructType(Seq(StructField("key", StringType), > StructField("newValue", StringType), > StructField("deleted", BooleanType), StructField("time", IntegerType)))) > > ccd.createOrReplaceTempView("changes") > ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)") > val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] > val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] > target.as("A").merge(ccd.as("B"), "A.key=B.key"). > whenMatched("B.deleted=false"). > updateExpr(updateMap). > whenNotMatched("B.deleted=false"). > insertExpr(insertMap). > whenMatched("B.deleted=true"). > delete().execute() > > {code} > SQL Queries to run : > {code} > sql("select count(*) from target").show() > sql("select * from target order by key").show() > {code} > Results in spark-shell --master yarn > {code} > scala> sql("select count(*) from target").show() > +--------+ > |count(1)| > +--------+ > | 4| > +--------+ > scala> sql("select * from target order by key").show() > +---+-----+ > |key|value| > +---+-----+ > | a| 0| > | b| 1| > | c| 2| > | d| 3| > +---+-----+ > {code} > Results in spark-shell --master local > {code} > scala> sql("select count(*) from target").show() > +--------+ > |count(1)| > +--------+ > | 3| > +--------+ > scala> sql("select * from target order by key").show() > +---+-----+ > |key|value| > +---+-----+ > | c| 200| > | d| 3| > | e| 100| > +---+-----+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |