Improve carbondata CDC performance

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Improve carbondata CDC performance

akashrn5
Hi community,

The Carbondata’s CDC is an important feature as CDC is an important use
case in data analytics of merging the source data changes to target table.
With the current design of CDC, the performance is not good when the data
is huge in the target table and input source data also. This mail is to
discuss about the improvements on this in phase wise.

In the offline discussion in community, we have decided to go with the
minmax based pruning in phase1, PFA link of design doc, please check and
give your inputs/suggestions.

https://docs.google.com/document/d/1Qa4yEbKYsYo7LUnnKjKHzF-DMtRUSfjkh9arWawPLSc/edit?usp=sharing

Thanks and Regards,
Akash R
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi all,

The design doc is updated, please go through and give your
inputs/suggestions.

Thanks,

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

David CaiQiang
Hi Akash,
    You can enhance the runtime filter to improve the join performance.

    It has the rule to dynamically check whether the join can add the
runtime filter or not.

    Better to push down the runtime filter into CarbonDataSourceScan, and
better to avoid adding a UDF function to rewrite the plan.

   



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi David,


Are you talking about the dynamic run-time filter pushdown of spark ? If
that is the case, I had already raised a discussion fee months back to do
the similarly  in Carbondata to improve carbon join by using carbondata's
metadata. But since spark is already doing it , it's decided not to do any
changes in carbon side for the same feature and make it complex.

So this design is decided in the community meeting last week, please
correct me if I'm wrong in understanding your reply.

Also it's not plan change, basically, it's just adding a where filter of
custom UDF of block paths in the current join query that's all which skips
the unwanted block files to scan.

Regards,
Akash

On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote:

> Hi Akash,
>     You can enhance the runtime filter to improve the join performance.
>
>     It has the rule to dynamically check whether the join can add the
> runtime filter or not.
>
>     Better to push down the runtime filter into CarbonDataSourceScan, and
> better to avoid adding a UDF function to rewrite the plan.
>
>
>
>
>
> -----
> Best Regards
> David Cai
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
In reply to this post by David CaiQiang
Hi,

In addition to this, the probability of false positives will be more when
we just push the runtime filter of source data min max ok target as it
purely depends on source data.

Even I had done POC by just adding a range filter on target table.

We need file level or block level pruning like SI does. So this approach
was decided.


Regards
Akash


On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote:

> Hi Akash,
>     You can enhance the runtime filter to improve the join performance.
>
>     It has the rule to dynamically check whether the join can add the
> runtime filter or not.
>
>     Better to push down the runtime filter into CarbonDataSourceScan, and
> better to avoid adding a UDF function to rewrite the plan.
>
>
>
>
>
> -----
> Best Regards
> David Cai
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

David CaiQiang
I mean you can push your logic into CarbonDataSourceScan as a dynamic runtime
filter.

Actually, CarbonDataSourceScan already used min/max zoom maps as an index
filter to prune blocklist (in the CarbonScanRDD.getPartition method).

We can do more things on the join query. Here I assume the source table is
much smaller than the target table.

1. when the join broadcast the source table
    1.1 when the join columns contain the partition keys of the target
table,  it can reuse the result of the broadcast to prune the partitions of
the target table.
    1.2 when the join query has some filters on the target table, use
min/max zoom maps to prune the block list of the target table
    1.3 when the join query has some filters on the source table, it can use
min/max zoom maps of join columns to match the result of the broadcast

2. when the join doesn't broadcast the source table
    2.1 when the join query has some filters on the target table, use
min/max zoom maps to prune the block list of the target table
    2.2 join source table with min/max zoom maps of the target table to get
the new block list.

In the future, it better to move all pruning logics of the driver side into
one place and invoke them in CarbonDataSourceScan to get input partitions
for ScanRDD. (include min/max index, si, partition pruning, and dynamic
filters)



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi,

i got your point basically you wanted to make this logic to be useful for
normal join also.
But for the same thing i had raised a discussion before,  you can check
here.
<http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/DISCUSSION-Join-optimization-with-Carbondata-s-metadata-tt103186.html#a103187>  

Since Spark already handling in new version, everyone's opinion was not to
make before spark. SO this will be specific for CDC as here its little
different as we are joining intermediate dataframe with source to get the
files to scan. SO this should be fine.

Only problem with the cartesian product as mentioned in design doc, you can
check and give your inputs on that, i also have one more solution to search
in a distributed way with a interval tree data structure.

Thanks,
Akash



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

David CaiQiang
+1, you can finish the implementation.

How about using the following SQL instead of the cartesian join?

SELECT df.filePath
FROM targetTableBlocks df
where exists (select 1 from srcTable where  srcTable.value between df.min
and df.max)



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi david,

Thanks for your suggestion.

I checked in local about the query you suggested, its going as a
*BroadcastNestedLoopJoin*.
As in local dataset is small it goes for that, but in cluster when the data
size grows it goes back to cartesian product again.

How about our own search logic in a distributed way using Interval tree
datastructure? it will be faster and wont impact much.

Other please give your suggestions.

Thanks

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

VenuReddy
In reply to this post by akashrn5

Hi Akash,

+1

Just ad few queries regd. query to filter only necessary target
files/blocks. -

SELECT target.filepath
                FROM targetTable,srcTable
WHERE srcTable.value BETWEEN targetTable.min AND targetTable.max;

1. This range will be one per the extended blocklet. right ? I mean, number
of ranges in the condition is equal to number extended blockets returned. Do
we merge these ranges(in case of overlaps ) to reduce the number of
conditions??

2. Doing aggregate/group by target.filepath helps?

Regards,
Venu



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi Venu,

Thanks for your review.

I have replied the same in the document.
you are right

1. its taken care to group by extended blocklets on split path and get the
min-max on block level
2. we need to do group by on the file path to avoid the duplicates from
dataframe output. I have updated the same in the doc please have a look.

Thanks,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

ravipesala
+1
Instead of doing the cartesian join, we can broadcast the sorted min/max
with file paths and do the binary search inside the map function.

Thank you

On Wed, 24 Feb 2021 at 13:02, akashrn5 <[hidden email]> wrote:

> Hi Venu,
>
> Thanks for your review.
>
> I have replied the same in the document.
> you are right
>
> 1. its taken care to group by extended blocklets on split path and get the
> min-max on block level
> 2. we need to do group by on the file path to avoid the duplicates from
> dataframe output. I have updated the same in the doc please have a look.
>
> Thanks,
> Akash R
>
>
>
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>


--
Thanks & Regards,
Ravi
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

kunalkapoor
+1, agree with ravi's suggestion

On Thu, Mar 11, 2021 at 7:53 PM Ravindra Pesala <[hidden email]>
wrote:

> +1
> Instead of doing the cartesian join, we can broadcast the sorted min/max
> with file paths and do the binary search inside the map function.
>
> Thank you
>
> On Wed, 24 Feb 2021 at 13:02, akashrn5 <[hidden email]> wrote:
>
> > Hi Venu,
> >
> > Thanks for your review.
> >
> > I have replied the same in the document.
> > you are right
> >
> > 1. its taken care to group by extended blocklets on split path and get
> the
> > min-max on block level
> > 2. we need to do group by on the file path to avoid the duplicates from
> > dataframe output. I have updated the same in the doc please have a look.
> >
> > Thanks,
> > Akash R
> >
> >
> >
> > --
> > Sent from:
> > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
> >
>
>
> --
> Thanks & Regards,
> Ravi
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

Indhumathi
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

Ajantha Bhat
+1 for this improvement,

But as this optimization is dependent on the data. There may be a scenario
where after you prune with min max also your dataset size remain almost
same as original.
Which brings in extra overhead of the new operations added.
Do you have plan to add some intelligence or threshold or fallback
mechanism for that case ?

Thanks,
Ajantha

On Mon, Mar 29, 2021 at 5:59 PM Indhumathi <[hidden email]> wrote:

> +1
>
> Regards,
> Indhumathi M
>
>
>
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
Hi Ajantha,

Thanks for your points.

Now actually we cache the splits, actual join will be faster and, even
though the pruning doesn't happen it wont affect the performance much. This
is learned from the test we did during POC and it doesn't make much
difference in performance, basically no degrade as such.

And having said that, in actual use cases or I can say customer scenario,
when you have huge target table in data warehouse or data lake for
analytics, the CDC will be on small portion of data itself. Changing the
whole table is rarest scenario, so it should be fine.

Thanks

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
In reply to this post by ravipesala
Hi Ravi,

Thanks for your inputs.

Actually, the test with binary search and broadcasting didn't give much
benefit and from code perspective also the we need to sort the data our self
based on min max search logic for the array, and also considering the
scenarios of multiple blocks with same min and max, same min or max, the
code wont be much cleaner or modular. This is learning from my side.

So I have updated the design in a similar way, like first we deduplicate the
records and we will have a interval tree which handled the scenario i
mentioned above and insertion and search cost it fast as learned from test
result. Code is very less and clean to understand. the performance we get
from it very good. So I will be updating the design based on that.

I will try to rise PR in this week for the review.

Thanks.

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Improve carbondata CDC performance

akashrn5
In reply to this post by Ajantha Bhat
Hi,

In new design we cache the splits and actual join operation makes use of it
and will  be faster. So from the test results even though the dataset didn't
prune anything, it wont make any difference in performance.
Basically doesn't degrade.

As far as actual use case, the changing of who table is rarest scenario, so
it should be fine.

Thanks

Regards,
Akash R



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/