Login  Register

Re: [DISCUSS] Data loading improvement

Posted by ravipesala on May 22, 2017; 3:54am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/DISCUSS-Data-loading-improvement-tp11429p13057.html

Hi,

Using Object[] as a row while loading is not efficient in terms of memory
usage. It would be more efficient to keep them in unsafe as it can keep the
data in more compacted way as per data type.

And regarding sorting it would be good to concentrate on single sorting
solution. Since we already have stable unsafe sort we should better
consider removing other normal sorting implementation to reduce the
maintaining cost. And concentrate more on improving the unsafe sort.

As per the observation GC is reduced a lot in case of unsafe sort but still
some GC is observed, that is I think because of writer step as it still
holds the data for some time on heap for sorting and writing the data in
blocklet wise. Even this data also we can consider keep in unsafe to avoid
GC.


Regards,
Ravindra.

On 21 May 2017 at 23:15, Jacky Li <[hidden email]> wrote:

> Hi,
>
> While I am working on data load improvement and encoding override feature,
> I found that it is not efficient to use the CarbonRow with Object[]. I
> think a better way is to use fix length primitive type instead of Object.
>
> Since currently SORT_COLUMNS is merged, I think it is possible to:
> 1. separate key columns and non key columns
> 2. use exact data type for all columns instead of Object
> 3. no need to differentiate dimension and measure while loading, encoding
> is based on default strategy or user override
>
> For example:
>
> TablePage {
>         ColumnPage[] keyPages;
>         ColumnPage[] nonKeyPages;
> }
>
> ColumnPage has following implementations:
> 1. FixtLengthColumnPage // internal data type: byte, short, int, long,
> float, double, dictionary encoding column also should use this class
> 2. StringColumnPage // internal data type: byte[]
> 3. DecimalColumnPage // internal data type: byte[]
> 4. ComplexColumnPage // internal data type: byte[]
>
> If user specify only dictionary column in SORT_COLUMNS and cardinality is
> less, it is possible that we can further optimize the keyPage member as
> long data type (can hold up to 4 columns, each column cardinality less than
> 65535), so that it can be fit in CPU cache and it will be more efficient
> while sorting.
>
> Regards,
> Jacky
>
> > 在 2017年4月27日,上午9:20,Jacky Li <[hidden email]> 写道:
> >
> > Hi community,
> >
> > More idea on the loading process, in my opinion the ideal one should be
> as following, please comment:
> >
> > 1. input step:
> > - Do the parsing of input data, either CSV or Dataframe, they all
> convert into CarbonRow.
> > - Buffering them to CarbonRowBatch
> > - Prefetchiong of rows
> >
> > 2. convert step:
> > - Convert fields based on the category of columns
> >  - for global dictionary encoded columns, convert them into dictionary
> id.
> >  - for primitive type columns, just keep as origin type. (no conversion)
> >  - for string and variable length data type columns (like complex
> columns), convert to LV (length-value) encoded byte array
> >
> > - Convert input row to columnar layout as we read from previous step.
> Create a ColumnPage for one column, and a DataPage for all columns.
> >
> > - The DataPage should store the key columns specified by SORT_COLUMNS
> together in one vector, so that it will be efficient for CPU cache while
> sorting
> >
> > 3. index-build step:
> > - Based on configuration, there can be index-build step or without this
> step if SORT_COLUMNS is empty, which indicating no sort.
> >
> > - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage
> on the fly. The sort result is array of rowId(pageId + offset). Avoid using
> Arrays.sort in JDK.
> >
> > - The sorter should support batch sort or merge sort. For merge sort,
> muliple spilling strategy can be implemented.
> >
> > 4. write step:
> > - Calculate the start/end key of MDKey over the ColumnPage
> >
> > - Calculate the statistics of the ColumnPage. Currently in carbondata
> min/max stats is hard coded. As a first step of extension, we want to
> provide more stats like histogram and bloom filter so that user can choose
> from. Optional field need to be added in thrift definition in DataChunk
> struct
> >
> > - Encode and compress each column into byte array. To make encoding more
> trasparent to the user and extensible, we should first decouple the data
> type with the available codec options. So in the code can choose codec for
> specific data type by some strategy. For the strategy, we can firstly have
> a default one as beginning, later on we can open this interface for
> developer to extension in the future. (say, developer can choose codec
> based on some hueristic rules based on the data distribution)
> >
> > - After each column is encoded and compressed, follow the current
> Producer-Consumer logic to write to disk.
> >
> >
> > As the bottlenet will be the merge sort, some thought on the merging
> strategy:
> > Merge sort strategy:
> > 1. spill both key columns and non-key columns in sorted order
> > 2. spill key columns in sorted order and non-key columns in original
> order, and spill rowId also. Apply rowId mapping only in merger stage. This
> can avoid some random memory access on non-key columns. It is good if there
> are many non-key columns
> > 3. do not spill key columns, and spill non-key columns in origin order.
> This is good as we can keep more key columns in memory
> >
> >
> > Regards,
> > Jacky
> >
> >
> >> 在 2017年4月22日,下午8:08,Liang Chen <[hidden email]> 写道:
> >>
> >> Jacky, thank you list these constructive improvements of data loading.
> >>
> >> Agree to consider all these improvement points, only the below one i
> have
> >> some concerns.
> >> Before considering open interfaces for data loading, we need to more
> >> clearly define block/blocklet/page which play what different roles,
> then we
> >> could consider some interfaces for block level, some interfaces for
> >> blocklet level, some for page level.
> >>
> >> Let us take up these improvements in the coming release.
> >> ------------------------------------------------------------
> --------------------------------------------------------
> >> improvement 3: we want to open up some interfaces for letting developer
> to
> >> add more page encoding, statistics, page compression. These interface
> will
> >> be like callbacks, so developer can write new encoding/statistics/
> compression
> >> method and carbon loading process will invoke it in this step. This
> >> interface will be like:
> >>
> >> Regards
> >> Liang
> >>
> >> 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>:
> >>
> >>> I want to propose following improvement on data loading process:
> >>>
> >>> Currently different steps are using different data layout in CarbonRow,
> >>> and it convert back and forth in different steps. It is not easy for
> >>> developer to understand the data structure used in each steps and it
> >>> increase the memory requirement as it is doing unnecessary data
> copying in
> >>> some steps. So, suggest to improve it as following
> >>>
> >>>  1) input step: read input and create a CarbonRow with all fields are
> >>> string type
> >>>
> >>>  2) convert step: convert string to byte[] according to its data type,
> >>> this step has compression effect of the input row so it is good for
> saving
> >>> memory and it also take cares of the null value
> >>>       if it is dictionary dimension then convert to surrogate key;
> >>>       if it is no dictionary then convert to byte[] representation;
> >>>       if it is complex dimension, then convert to byte[]
> representation;
> >>>       if it is measure then convert to Object, like Integer, Long,
> >>> Double, according to schema  —> change to byte[] instead of storing
> Object
> >>> to avoid boxing/unboxing and save memory
> >>>
> >>>       The conversion is happened by updating the field in CarbonRow in
> >>> place, there should be no new CarbonRow created, however, there is a
> copy
> >>> operation of the input CarbonRow, for bad record handling  —>  do not
> copy
> >>> the row, convert it back to value if it is bad record
> >>>
> >>>  3) sort step:
> >>>     improvement 1: sort the collected input CarbonRow. Currently this
> is
> >>> done by copying the row object into internal buffer and sort is done
> on the
> >>> buffer. —> to avoid the copying of the CarbonRow, we should create this
> >>> buffer (with RowID) in input step, and only output the sorted RowID (by
> >>> swapping its value in the RowID array) according to its value. If it
> is a
> >>> merge sort, then write to file based on this sorted RowID array when
> >>> spilling to disk. So no copy of CarbonRow is required.
> >>>
> >>>    improvement 2: when spilling to disk, currently it changes the field
> >>> order in CarbonRow, it is writing as a 3-elements array, [global
> dictionary
> >>> dimension, plain dimension and complex dimension, measure columns] ,
> this
> >>> is because the merger is expecting it like this —> I think this is
> >>> unnecessary, we can add serialization/deserialization capability in
> >>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it
> also
> >>> avoid this conversion in write step.
> >>>
> >>>  4) write step:
> >>>     currently it will collect one page of data (32K rows) and start a
> >>> Producer which actually is the encode process of one page. In order to
> >>> support parallel processing, after the page data is encoded then put
> it to
> >>> a queue which will be taken by the Consumer, the Consumer will collect
> >>> pages up to one blocklet size (configurable, say 64MB), and write to
> >>> CarbonData files.
> >>>
> >>>     improvement 1: there is an unnecessary data copy and re-ordering of
> >>> the fields of the row. it converts the row to: [measure columns, plain
> >>> dimension and complex dimension,  global dictionary dimension] it is
> >>> different from what sort step outputs. —> so suggest to use CarbonRow
> only.
> >>> no new row object should be created here.
> >>>
> >>>     improvement 2: there are multiple traversal of the page data in the
> >>> code currently —> we should change to, firstly convert the CarbonRow to
> >>> ColumnarPage which is the columnar representation for all columns in
> one
> >>> page, and collect the start/end key and statistics when doing this
> columnar
> >>> conversion. Then apply inverted index, RLE, compression process based
> on
> >>> ColumnarPage  object.
> >>>
> >>>     improvement 3: we want to open up some interfaces for letting
> >>> developer to add more page encoding, statistics, page compression.
> These
> >>> interface will be like callbacks, so developer can write new
> >>> encoding/statistics/compression method and carbon loading process will
> >>> invoke it in this step. This interface will be like:
> >>>
> >>> /**
> >>> *  Codec for a column page, implementation should not keep state across
> >>> pages,
> >>> *  caller will use the same object to encode multiple pages
> >>> */
> >>> interface PageCodec {
> >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */
> >>> String getName();
> >>> void startPage(int pageID);
> >>> void processColumn(ColumnBatch batch);
> >>> byte[] endPage(int pageID);
> >>> ColumnBatch decode(byte[] encoded);
> >>> }
> >>>
> >>> /** Compressor of the page data, the flow is encode->compress, and
> >>> decompress->decode */
> >>> interface PageCompressor {
> >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */
> >>> String getName();
> >>> byte[] compress(byte[] encodedData);
> >>> byte[] decompress(byte[] data);
> >>> }
> >>>
> >>> /** Calculate the statistics for a column page and blocklet */
> >>> interface Statistics {
> >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */
> >>> String getName();
> >>> void startPage(int pageID);
> >>> void endPage(int pageID);
> >>> void startBlocklet(int blockletID);
> >>> void endBlocklet(int blockletID);
> >>>
> >>> /** Update the stats for the input batch */
> >>> void update(ColumnBatch batch);
> >>>
> >>> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3)
> */
> >>> int[] getPageStatistisc();
> >>>
> >>> /** Output will be written to Footer */
> >>> int[] getBlockletStatistics();
> >>> }
> >>>
> >>> And, there should be a partition step adding somewhere to support
> >>> partition feature (CARBONDATA-910), and it depends on whether we
> implement
> >>> this partition shuffling in spark layer or carbon layer. (before input
> step
> >>> or after conversion step). What is the current idea of this? @CaiQiang
> >>> @Lionel
> >>>
> >>> What you think about these improvements?
> >>>
> >>> Regards,
> >>> Jacky
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >> --
> >> Regards
> >> Liang
> >>
> >
> >
> >
>
>
>
>


--
Thanks & Regards,
Ravi