[Discussion]Query Regarding Task launch mechanism for data load operations

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[Discussion]Query Regarding Task launch mechanism for data load operations

VenuReddy
Hi all.!

For the past few days, was trying to tune the perfomance parameters to
improve the data loading speed(Carbon with Spark) for a new customer. During
the course of tuning, have noticed a few points and stuck there as it seem
to be base behavior. Could you guys help me to understand the rationale
behind it and probably you have any better suggestions.

1. When load data with a folder having too many small files, we are trying
to distribute splits such that we use all the availble nodes in
cluster(based on file availability on nodes) and lauch a task to executor on
each node. On executor, split contains multiple blocks( i.e., multiple
files. so multiple record readers/input iterators to read from). We make
output iterators based on
/carbon.number.of.cores.while.loading///spark.executor.cores/ property. If
configured as 4, then 4 carbondata files are written in case of no_sort. So
each executor will write the number of carbon files equal to
/carbon.number.of.cores.while.loading/ or /spark.executor.cores/ property.
It leads to generation of many small carbondata files in case of no_sort

*My questions are:
1. Why do we distribute tasks among many nodes even if amount of data to
load is small? Why not consider the file sizes as well into account and
lauch minimal tasks ?
2. Also on executor, why not control the number of output iterators(and in
turn carbondata files to write) considering the amount of records being
processed ? I understand the fact that we can set
/carbon.number.of.cores.while.loading/ dynamic property to 1 to make
executor write a single carbondata file. But it would be difficult to decide
manually when to reduce to lower value in such cases.*

2. When we do /insert into table select from/ or /create table as select
from/, we lauch one single task per node(with CoalescedRDDPartition) based
on the locality of data irrespective of sort_scope of target table.  It has
record readers/input Iterators(with LazyRDDInternalRowIterator) for each
file to read. Whereas when we do a simple /select * from table/ query, tasks
launched are equal to number of carbondata files with
CARBON_TASK_DISTRIBUTION_BLOCK.

*My questions are:
1. Why do we lunch one task per node(i.e on data available node) in data
load? If  have a cluster where more executor instances are available per
node, why not launch multiple tasks(ofcource based on data availability on
node) ? Probably this could have improved load performance ?
*

Request you guys help me to understand the rationale behind it and probably
you have any better suggestions or if it is planned in future work scope ?  

Your help would be greatly appreciated. Tthank you in advance

Regards,
Venu



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion]Query Regarding Task launch mechanism for data load operations

David CaiQiang
This mechanism will work fine for LOCAL_SORT loading of big data and the
small cluster with big executor.

If it doesn't match these conditions, better consider a new solution to
adapter the generic scenario.

I suggest re-factoring NO_SORT, maybe we can check and improve the
global_sort solution.

The solution should support both NO_SORT and GLOBAL_SORT, and automatically
determines the number of partitions to avoid small file issue.




-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion]Query Regarding Task launch mechanism for data load operations

Venkata Gollamudi
Hi Varun,

Yes, previously most cases were tuned for LOCAL_SORT, where merging will
automatically happen.  But certainly data loading flow can be improved to
do it based on data size, rather than a fixed configuration.
However old behaviour might also be required, if the user has to control
the maximum number of partitions in case data size is too big.  This
configuration has started as data loading cores are not transparent to
spark, mainly in case of LOCAL_SORT.

Same thing is applicable for insert into scenario also, as you said
coalescing will reduce the load performance.

Regards,
Ramana

On Fri, Aug 14, 2020 at 3:25 PM David CaiQiang <[hidden email]> wrote:

> This mechanism will work fine for LOCAL_SORT loading of big data and the
> small cluster with big executor.
>
> If it doesn't match these conditions, better consider a new solution to
> adapter the generic scenario.
>
> I suggest re-factoring NO_SORT, maybe we can check and improve the
> global_sort solution.
>
> The solution should support both NO_SORT and GLOBAL_SORT, and automatically
> determines the number of partitions to avoid small file issue.
>
>
>
>
> -----
> Best Regards
> David Cai
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion]Query Regarding Task launch mechanism for data load operations

kumarvishal09
Hi Venu,
@Ramana mentioned most of the cases were optimized for local sort.
Yes we can use Global sort like solution for No sort case so the number of
tasks can be based on number of executor+core and we can use compaction
after load, to handle small files.

I remember we do have one Block Assignment Strategy for loading based on
minimum size. Can u please check. This feature may need stabilization.
Hope works the same as expected.
*org.apache.carbondata.processing.util.CarbonLoaderUtil.BlockAssignmentStrategy*

/**
 * The node loads the smallest amount of data
 */
@CarbonProperty
public static final String CARBON_LOAD_MIN_SIZE_INMB = "load_min_size_inmb";

/**
 * the node minimum load data default value
 */
public static final String CARBON_LOAD_MIN_SIZE_INMB_DEFAULT = "0";

-Regards

Kumar Vishal




On Mon, Aug 17, 2020 at 9:39 PM Venkata Gollamudi <[hidden email]>
wrote:

> Hi Varun,
>
> Yes, previously most cases were tuned for LOCAL_SORT, where merging will
> automatically happen.  But certainly data loading flow can be improved to
> do it based on data size, rather than a fixed configuration.
> However old behaviour might also be required, if the user has to control
> the maximum number of partitions in case data size is too big.  This
> configuration has started as data loading cores are not transparent to
> spark, mainly in case of LOCAL_SORT.
>
> Same thing is applicable for insert into scenario also, as you said
> coalescing will reduce the load performance.
>
> Regards,
> Ramana
>
> On Fri, Aug 14, 2020 at 3:25 PM David CaiQiang <[hidden email]>
> wrote:
>
> > This mechanism will work fine for LOCAL_SORT loading of big data and the
> > small cluster with big executor.
> >
> > If it doesn't match these conditions, better consider a new solution to
> > adapter the generic scenario.
> >
> > I suggest re-factoring NO_SORT, maybe we can check and improve the
> > global_sort solution.
> >
> > The solution should support both NO_SORT and GLOBAL_SORT, and
> automatically
> > determines the number of partitions to avoid small file issue.
> >
> >
> >
> >
> > -----
> > Best Regards
> > David Cai
> > --
> > Sent from:
> > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
> >
>
kumar vishal
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion]Query Regarding Task launch mechanism for data load operations

VenuReddy
This post was updated on .
Hi Vishal,

Thank you for the response.
Configuring load option `load_min_size_inmb` has helped to control the number of tasks
to launch in case of load from csv files and could eventually reduce the o/p
carbondata files from each executor when configured along with `carbon.number.of.cores.while.loading` dynamic property.


But in case of insert into table select from flow `loadDataFrame()`, problem
didn't get resolved as we have completely different task launching
approach(not same as in `loadDataFile()`. Do you have suggestions about any
paramter to fine tune in insert flow ?

1. Any way to launch more than 1 task per node ?
 
2. Any way to contrl the number of output carbondata files for target table,
when there are too many small sized carbondata files to read/select from src
table ?  Otherwise it generates the output files equal to input files.
    -> I tried carbon property,
`carbon.task.distribution`=`merge_small_files`. Could reduce the number of
files generated for target table. Scanrdd with
CARBON_TASK_DISTRIBUTION_MERGE_FILES used similar mechanism as global
partition load(considered filesMaxPartitionBytes, filesOpenCostInBytes and
defaultParallelism for split size).
        But, this property is not dynamically configured. Probably for some
reason ? Confused if it is a good option to use that property in this
scenario.

Any suggestions would be very helpful.

regards,
Venu



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/