[ https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Akash R Nilugal resolved CARBONDATA-3902. ----------------------------------------- Fix Version/s: 2.1.0 Resolution: Fixed > Query on partition table gives incorrect results after Delete records using CDC > ------------------------------------------------------------------------------- > > Key: CARBONDATA-3902 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3902 > Project: CarbonData > Issue Type: Bug > Reporter: Indhumathi Muthumurugesh > Priority: Major > Fix For: 2.1.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > Steps to Reproduce Issue : > {code:java} > import scala.collection.JavaConverters. > import java.sql.Date > import org.apache.spark.sql._ > import org.apache.spark.sql.CarbonSession._ > import org.apache.spark.sql.catalyst.TableIdentifier > import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.test.util.QueryTest > import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} > import spark.implicits. > sql("drop table if exists target").show() > val initframe = spark.createDataFrame(Seq( > Row("a", "0"), > Row("b", "1"), > Row("c", "2"), > Row("d", "3") > ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) > initframe.write > .format("carbondata") > .option("tableName", "target") > .option("partitionColumns", "value") > .mode(SaveMode.Overwrite) > .save() > val target = spark.read.format("carbondata").option("tableName", "target").load() > var ccd = > spark.createDataFrame(Seq( > Row("a", "10", false, 0), > Row("a", null, true, 1), > Row("b", null, true, 2), > Row("c", null, true, 3), > Row("c", "20", false, 4), > Row("c", "200", false, 5), > Row("e", "100", false, 6) > ).asJava, > StructType(Seq(StructField("key", StringType), > StructField("newValue", StringType), > StructField("deleted", BooleanType), StructField("time", IntegerType)))) > ccd.createOrReplaceTempView("changes") > ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)") > val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] > val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] > target.as("A").merge(ccd.as("B"), "A.key=B.key"). > whenMatched("B.deleted=true"). > delete().execute(){code} > > After this delete operation, partition 0, 1 and 2 should have deleted from it. > Actual: > {color:#067d17}select * from target order by key;{color} > {color:#067d17}+---+-----+ > |key|value| > +---+-----+ > |a |0 | > |b |1 | > |c |2 | > |d |3 | > +---+-----+{color} > {color:#067d17}Expected:{color} > {color:#067d17}+---+-----+ > |key|value| > +---+-----+ > |d |3 | > +---+-----+{color} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |