[jira] [Resolved] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Resolved] (CARBONDATA-3902) Query on partition table gives incorrect results after Delete records using CDC

Akash R Nilugal (Jira)

     [ https://issues.apache.org/jira/browse/CARBONDATA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Akash R Nilugal resolved CARBONDATA-3902.
-----------------------------------------
    Fix Version/s: 2.1.0
       Resolution: Fixed

> Query on partition table gives incorrect results after Delete records using CDC
> -------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-3902
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3902
>             Project: CarbonData
>          Issue Type: Bug
>            Reporter: Indhumathi Muthumurugesh
>            Priority: Major
>             Fix For: 2.1.0
>
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Steps to Reproduce Issue :
> {code:java}
> import scala.collection.JavaConverters.
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
> import spark.implicits.
> sql("drop table if exists target").show()
> val initframe = spark.createDataFrame(Seq(
> Row("a", "0"),
> Row("b", "1"),
> Row("c", "2"),
> Row("d", "3")
> ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
> initframe.write
> .format("carbondata")
> .option("tableName", "target")
> .option("partitionColumns", "value")
> .mode(SaveMode.Overwrite)
> .save()
> val target = spark.read.format("carbondata").option("tableName", "target").load()
> var ccd =
> spark.createDataFrame(Seq(
> Row("a", "10", false, 0),
> Row("a", null, true, 1),
> Row("b", null, true, 2),
> Row("c", null, true, 3),
> Row("c", "20", false, 4),
> Row("c", "200", false, 5),
> Row("e", "100", false, 6)
> ).asJava,
> StructType(Seq(StructField("key", StringType),
> StructField("newValue", StringType),
> StructField("deleted", BooleanType), StructField("time", IntegerType))))
> ccd.createOrReplaceTempView("changes")
> ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
> val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]
> val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]
> target.as("A").merge(ccd.as("B"), "A.key=B.key").
> whenMatched("B.deleted=true").
> delete().execute(){code}
>  
>  After this delete operation, partition 0, 1 and 2 should have deleted from it.
> Actual:
> {color:#067d17}select * from target order by key;{color}
> {color:#067d17}+---+-----+
> |key|value|
> +---+-----+
> |a |0 |
> |b |1 |
> |c |2 |
> |d |3 |
> +---+-----+{color}
> {color:#067d17}Expected:{color}
> {color:#067d17}+---+-----+
> |key|value|
> +---+-----+
> |d |3 |
> +---+-----+{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)