Hi all.!
For the past few days, was trying to tune the perfomance parameters to improve the data loading speed(Carbon with Spark) for a new customer. During the course of tuning, have noticed a few points and stuck there as it seem to be base behavior. Could you guys help me to understand the rationale behind it and probably you have any better suggestions. 1. When load data with a folder having too many small files, we are trying to distribute splits such that we use all the availble nodes in cluster(based on file availability on nodes) and lauch a task to executor on each node. On executor, split contains multiple blocks( i.e., multiple files. so multiple record readers/input iterators to read from). We make output iterators based on /carbon.number.of.cores.while.loading///spark.executor.cores/ property. If configured as 4, then 4 carbondata files are written in case of no_sort. So each executor will write the number of carbon files equal to /carbon.number.of.cores.while.loading/ or /spark.executor.cores/ property. It leads to generation of many small carbondata files in case of no_sort *My questions are: 1. Why do we distribute tasks among many nodes even if amount of data to load is small? Why not consider the file sizes as well into account and lauch minimal tasks ? 2. Also on executor, why not control the number of output iterators(and in turn carbondata files to write) considering the amount of records being processed ? I understand the fact that we can set /carbon.number.of.cores.while.loading/ dynamic property to 1 to make executor write a single carbondata file. But it would be difficult to decide manually when to reduce to lower value in such cases.* 2. When we do /insert into table select from/ or /create table as select from/, we lauch one single task per node(with CoalescedRDDPartition) based on the locality of data irrespective of sort_scope of target table. It has record readers/input Iterators(with LazyRDDInternalRowIterator) for each file to read. Whereas when we do a simple /select * from table/ query, tasks launched are equal to number of carbondata files with CARBON_TASK_DISTRIBUTION_BLOCK. *My questions are: 1. Why do we lunch one task per node(i.e on data available node) in data load? If have a cluster where more executor instances are available per node, why not launch multiple tasks(ofcource based on data availability on node) ? Probably this could have improved load performance ? * Request you guys help me to understand the rationale behind it and probably you have any better suggestions or if it is planned in future work scope ? Your help would be greatly appreciated. Tthank you in advance Regards, Venu -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
This mechanism will work fine for LOCAL_SORT loading of big data and the
small cluster with big executor. If it doesn't match these conditions, better consider a new solution to adapter the generic scenario. I suggest re-factoring NO_SORT, maybe we can check and improve the global_sort solution. The solution should support both NO_SORT and GLOBAL_SORT, and automatically determines the number of partitions to avoid small file issue. ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi Varun,
Yes, previously most cases were tuned for LOCAL_SORT, where merging will automatically happen. But certainly data loading flow can be improved to do it based on data size, rather than a fixed configuration. However old behaviour might also be required, if the user has to control the maximum number of partitions in case data size is too big. This configuration has started as data loading cores are not transparent to spark, mainly in case of LOCAL_SORT. Same thing is applicable for insert into scenario also, as you said coalescing will reduce the load performance. Regards, Ramana On Fri, Aug 14, 2020 at 3:25 PM David CaiQiang <[hidden email]> wrote: > This mechanism will work fine for LOCAL_SORT loading of big data and the > small cluster with big executor. > > If it doesn't match these conditions, better consider a new solution to > adapter the generic scenario. > > I suggest re-factoring NO_SORT, maybe we can check and improve the > global_sort solution. > > The solution should support both NO_SORT and GLOBAL_SORT, and automatically > determines the number of partitions to avoid small file issue. > > > > > ----- > Best Regards > David Cai > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
Hi Venu,
@Ramana mentioned most of the cases were optimized for local sort. Yes we can use Global sort like solution for No sort case so the number of tasks can be based on number of executor+core and we can use compaction after load, to handle small files. I remember we do have one Block Assignment Strategy for loading based on minimum size. Can u please check. This feature may need stabilization. Hope works the same as expected. *org.apache.carbondata.processing.util.CarbonLoaderUtil.BlockAssignmentStrategy* /** * The node loads the smallest amount of data */ @CarbonProperty public static final String CARBON_LOAD_MIN_SIZE_INMB = "load_min_size_inmb"; /** * the node minimum load data default value */ public static final String CARBON_LOAD_MIN_SIZE_INMB_DEFAULT = "0"; -Regards Kumar Vishal On Mon, Aug 17, 2020 at 9:39 PM Venkata Gollamudi <[hidden email]> wrote: > Hi Varun, > > Yes, previously most cases were tuned for LOCAL_SORT, where merging will > automatically happen. But certainly data loading flow can be improved to > do it based on data size, rather than a fixed configuration. > However old behaviour might also be required, if the user has to control > the maximum number of partitions in case data size is too big. This > configuration has started as data loading cores are not transparent to > spark, mainly in case of LOCAL_SORT. > > Same thing is applicable for insert into scenario also, as you said > coalescing will reduce the load performance. > > Regards, > Ramana > > On Fri, Aug 14, 2020 at 3:25 PM David CaiQiang <[hidden email]> > wrote: > > > This mechanism will work fine for LOCAL_SORT loading of big data and the > > small cluster with big executor. > > > > If it doesn't match these conditions, better consider a new solution to > > adapter the generic scenario. > > > > I suggest re-factoring NO_SORT, maybe we can check and improve the > > global_sort solution. > > > > The solution should support both NO_SORT and GLOBAL_SORT, and > automatically > > determines the number of partitions to avoid small file issue. > > > > > > > > > > ----- > > Best Regards > > David Cai > > -- > > Sent from: > > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > > >
kumar vishal
|
This post was updated on .
Hi Vishal,
Thank you for the response. Configuring load option `load_min_size_inmb` has helped to control the number of tasks to launch in case of load from csv files and could eventually reduce the o/p carbondata files from each executor when configured along with `carbon.number.of.cores.while.loading` dynamic property. But in case of insert into table select from flow `loadDataFrame()`, problem didn't get resolved as we have completely different task launching approach(not same as in `loadDataFile()`. Do you have suggestions about any paramter to fine tune in insert flow ? 1. Any way to launch more than 1 task per node ? 2. Any way to contrl the number of output carbondata files for target table, when there are too many small sized carbondata files to read/select from src table ? Otherwise it generates the output files equal to input files. -> I tried carbon property, `carbon.task.distribution`=`merge_small_files`. Could reduce the number of files generated for target table. Scanrdd with CARBON_TASK_DISTRIBUTION_MERGE_FILES used similar mechanism as global partition load(considered filesMaxPartitionBytes, filesOpenCostInBytes and defaultParallelism for split size). But, this property is not dynamically configured. Probably for some reason ? Confused if it is a good option to use that property in this scenario. Any suggestions would be very helpful. regards, Venu -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Free forum by Nabble | Edit this page |