Hi community,
The Carbondata’s CDC is an important feature as CDC is an important use case in data analytics of merging the source data changes to target table. With the current design of CDC, the performance is not good when the data is huge in the target table and input source data also. This mail is to discuss about the improvements on this in phase wise. In the offline discussion in community, we have decided to go with the minmax based pruning in phase1, PFA link of design doc, please check and give your inputs/suggestions. https://docs.google.com/document/d/1Qa4yEbKYsYo7LUnnKjKHzF-DMtRUSfjkh9arWawPLSc/edit?usp=sharing Thanks and Regards, Akash R |
Hi all,
The design doc is updated, please go through and give your inputs/suggestions. Thanks, Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi Akash,
You can enhance the runtime filter to improve the join performance. It has the rule to dynamically check whether the join can add the runtime filter or not. Better to push down the runtime filter into CarbonDataSourceScan, and better to avoid adding a UDF function to rewrite the plan. ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi David,
Are you talking about the dynamic run-time filter pushdown of spark ? If that is the case, I had already raised a discussion fee months back to do the similarly in Carbondata to improve carbon join by using carbondata's metadata. But since spark is already doing it , it's decided not to do any changes in carbon side for the same feature and make it complex. So this design is decided in the community meeting last week, please correct me if I'm wrong in understanding your reply. Also it's not plan change, basically, it's just adding a where filter of custom UDF of block paths in the current join query that's all which skips the unwanted block files to scan. Regards, Akash On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote: > Hi Akash, > You can enhance the runtime filter to improve the join performance. > > It has the rule to dynamically check whether the join can add the > runtime filter or not. > > Better to push down the runtime filter into CarbonDataSourceScan, and > better to avoid adding a UDF function to rewrite the plan. > > > > > > ----- > Best Regards > David Cai > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
In reply to this post by David CaiQiang
Hi,
In addition to this, the probability of false positives will be more when we just push the runtime filter of source data min max ok target as it purely depends on source data. Even I had done POC by just adding a range filter on target table. We need file level or block level pruning like SI does. So this approach was decided. Regards Akash On Thu, Feb 18, 2021, 6:36 AM David CaiQiang <[hidden email]> wrote: > Hi Akash, > You can enhance the runtime filter to improve the join performance. > > It has the rule to dynamically check whether the join can add the > runtime filter or not. > > Better to push down the runtime filter into CarbonDataSourceScan, and > better to avoid adding a UDF function to rewrite the plan. > > > > > > ----- > Best Regards > David Cai > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
I mean you can push your logic into CarbonDataSourceScan as a dynamic runtime
filter. Actually, CarbonDataSourceScan already used min/max zoom maps as an index filter to prune blocklist (in the CarbonScanRDD.getPartition method). We can do more things on the join query. Here I assume the source table is much smaller than the target table. 1. when the join broadcast the source table 1.1 when the join columns contain the partition keys of the target table, it can reuse the result of the broadcast to prune the partitions of the target table. 1.2 when the join query has some filters on the target table, use min/max zoom maps to prune the block list of the target table 1.3 when the join query has some filters on the source table, it can use min/max zoom maps of join columns to match the result of the broadcast 2. when the join doesn't broadcast the source table 2.1 when the join query has some filters on the target table, use min/max zoom maps to prune the block list of the target table 2.2 join source table with min/max zoom maps of the target table to get the new block list. In the future, it better to move all pruning logics of the driver side into one place and invoke them in CarbonDataSourceScan to get input partitions for ScanRDD. (include min/max index, si, partition pruning, and dynamic filters) ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi,
i got your point basically you wanted to make this logic to be useful for normal join also. But for the same thing i had raised a discussion before, you can check here. <http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/DISCUSSION-Join-optimization-with-Carbondata-s-metadata-tt103186.html#a103187> Since Spark already handling in new version, everyone's opinion was not to make before spark. SO this will be specific for CDC as here its little different as we are joining intermediate dataframe with source to get the files to scan. SO this should be fine. Only problem with the cartesian product as mentioned in design doc, you can check and give your inputs on that, i also have one more solution to search in a distributed way with a interval tree data structure. Thanks, Akash -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
+1, you can finish the implementation.
How about using the following SQL instead of the cartesian join? SELECT df.filePath FROM targetTableBlocks df where exists (select 1 from srcTable where srcTable.value between df.min and df.max) ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi david,
Thanks for your suggestion. I checked in local about the query you suggested, its going as a *BroadcastNestedLoopJoin*. As in local dataset is small it goes for that, but in cluster when the data size grows it goes back to cartesian product again. How about our own search logic in a distributed way using Interval tree datastructure? it will be faster and wont impact much. Other please give your suggestions. Thanks Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
In reply to this post by akashrn5
Hi Akash, +1 Just ad few queries regd. query to filter only necessary target files/blocks. - SELECT target.filepath FROM targetTable,srcTable WHERE srcTable.value BETWEEN targetTable.min AND targetTable.max; 1. This range will be one per the extended blocklet. right ? I mean, number of ranges in the condition is equal to number extended blockets returned. Do we merge these ranges(in case of overlaps ) to reduce the number of conditions?? 2. Doing aggregate/group by target.filepath helps? Regards, Venu -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi Venu,
Thanks for your review. I have replied the same in the document. you are right 1. its taken care to group by extended blocklets on split path and get the min-max on block level 2. we need to do group by on the file path to avoid the duplicates from dataframe output. I have updated the same in the doc please have a look. Thanks, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
+1
Instead of doing the cartesian join, we can broadcast the sorted min/max with file paths and do the binary search inside the map function. Thank you On Wed, 24 Feb 2021 at 13:02, akashrn5 <[hidden email]> wrote: > Hi Venu, > > Thanks for your review. > > I have replied the same in the document. > you are right > > 1. its taken care to group by extended blocklets on split path and get the > min-max on block level > 2. we need to do group by on the file path to avoid the duplicates from > dataframe output. I have updated the same in the doc please have a look. > > Thanks, > Akash R > > > > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > -- Thanks & Regards, Ravi |
+1, agree with ravi's suggestion
On Thu, Mar 11, 2021 at 7:53 PM Ravindra Pesala <[hidden email]> wrote: > +1 > Instead of doing the cartesian join, we can broadcast the sorted min/max > with file paths and do the binary search inside the map function. > > Thank you > > On Wed, 24 Feb 2021 at 13:02, akashrn5 <[hidden email]> wrote: > > > Hi Venu, > > > > Thanks for your review. > > > > I have replied the same in the document. > > you are right > > > > 1. its taken care to group by extended blocklets on split path and get > the > > min-max on block level > > 2. we need to do group by on the file path to avoid the duplicates from > > dataframe output. I have updated the same in the doc please have a look. > > > > Thanks, > > Akash R > > > > > > > > -- > > Sent from: > > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > > > > > -- > Thanks & Regards, > Ravi > |
+1
Regards, Indhumathi M -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
+1 for this improvement,
But as this optimization is dependent on the data. There may be a scenario where after you prune with min max also your dataset size remain almost same as original. Which brings in extra overhead of the new operations added. Do you have plan to add some intelligence or threshold or fallback mechanism for that case ? Thanks, Ajantha On Mon, Mar 29, 2021 at 5:59 PM Indhumathi <[hidden email]> wrote: > +1 > > Regards, > Indhumathi M > > > > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
Hi Ajantha,
Thanks for your points. Now actually we cache the splits, actual join will be faster and, even though the pruning doesn't happen it wont affect the performance much. This is learned from the test we did during POC and it doesn't make much difference in performance, basically no degrade as such. And having said that, in actual use cases or I can say customer scenario, when you have huge target table in data warehouse or data lake for analytics, the CDC will be on small portion of data itself. Changing the whole table is rarest scenario, so it should be fine. Thanks Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
In reply to this post by ravipesala
Hi Ravi,
Thanks for your inputs. Actually, the test with binary search and broadcasting didn't give much benefit and from code perspective also the we need to sort the data our self based on min max search logic for the array, and also considering the scenarios of multiple blocks with same min and max, same min or max, the code wont be much cleaner or modular. This is learning from my side. So I have updated the design in a similar way, like first we deduplicate the records and we will have a interval tree which handled the scenario i mentioned above and insertion and search cost it fast as learned from test result. Code is very less and clean to understand. the performance we get from it very good. So I will be updating the design based on that. I will try to rise PR in this week for the review. Thanks. Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
In reply to this post by Ajantha Bhat
Hi,
In new design we cache the splits and actual join operation makes use of it and will be faster. So from the test results even though the dataset didn't prune anything, it wont make any difference in performance. Basically doesn't degrade. As far as actual use case, the changing of who table is rarest scenario, so it should be fine. Thanks Regards, Akash R -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Free forum by Nabble | Edit this page |