Login  Register

Re: [DISCUSS] Data loading improvement

Posted by Jacky Li on May 22, 2017; 4:49am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/DISCUSS-Data-loading-improvement-tp11429p13059.html

Yes, Ravindra, unsafe sort will be better.
In my last mail, I mentioned a 8-bytes encoded format for RowID + SORT_COLUMNS, if SORT_COLUMNS are dictionary encoded, I think it is effectively like unsafe which is only type of byte[8], right? So we can do this by ourselves instead of depending on 3rd party library (sum.misc.Unsafe) ?

Regards,
Jacky

> 在 2017年5月22日,上午11:54,Ravindra Pesala <[hidden email]> 写道:
>
> Hi,
>
> Using Object[] as a row while loading is not efficient in terms of memory
> usage. It would be more efficient to keep them in unsafe as it can keep the
> data in more compacted way as per data type.
>
> And regarding sorting it would be good to concentrate on single sorting
> solution. Since we already have stable unsafe sort we should better
> consider removing other normal sorting implementation to reduce the
> maintaining cost. And concentrate more on improving the unsafe sort.
>
> As per the observation GC is reduced a lot in case of unsafe sort but still
> some GC is observed, that is I think because of writer step as it still
> holds the data for some time on heap for sorting and writing the data in
> blocklet wise. Even this data also we can consider keep in unsafe to avoid
> GC.
>
>
> Regards,
> Ravindra.
>
> On 21 May 2017 at 23:15, Jacky Li <[hidden email]> wrote:
>
>> Hi,
>>
>> While I am working on data load improvement and encoding override feature,
>> I found that it is not efficient to use the CarbonRow with Object[]. I
>> think a better way is to use fix length primitive type instead of Object.
>>
>> Since currently SORT_COLUMNS is merged, I think it is possible to:
>> 1. separate key columns and non key columns
>> 2. use exact data type for all columns instead of Object
>> 3. no need to differentiate dimension and measure while loading, encoding
>> is based on default strategy or user override
>>
>> For example:
>>
>> TablePage {
>>       ColumnPage[] keyPages;
>>       ColumnPage[] nonKeyPages;
>> }
>>
>> ColumnPage has following implementations:
>> 1. FixtLengthColumnPage // internal data type: byte, short, int, long,
>> float, double, dictionary encoding column also should use this class
>> 2. StringColumnPage // internal data type: byte[]
>> 3. DecimalColumnPage // internal data type: byte[]
>> 4. ComplexColumnPage // internal data type: byte[]
>>
>> If user specify only dictionary column in SORT_COLUMNS and cardinality is
>> less, it is possible that we can further optimize the keyPage member as
>> long data type (can hold up to 4 columns, each column cardinality less than
>> 65535), so that it can be fit in CPU cache and it will be more efficient
>> while sorting.
>>
>> Regards,
>> Jacky
>>
>>> 在 2017年4月27日,上午9:20,Jacky Li <[hidden email]> 写道:
>>>
>>> Hi community,
>>>
>>> More idea on the loading process, in my opinion the ideal one should be
>> as following, please comment:
>>>
>>> 1. input step:
>>> - Do the parsing of input data, either CSV or Dataframe, they all
>> convert into CarbonRow.
>>> - Buffering them to CarbonRowBatch
>>> - Prefetchiong of rows
>>>
>>> 2. convert step:
>>> - Convert fields based on the category of columns
>>> - for global dictionary encoded columns, convert them into dictionary
>> id.
>>> - for primitive type columns, just keep as origin type. (no conversion)
>>> - for string and variable length data type columns (like complex
>> columns), convert to LV (length-value) encoded byte array
>>>
>>> - Convert input row to columnar layout as we read from previous step.
>> Create a ColumnPage for one column, and a DataPage for all columns.
>>>
>>> - The DataPage should store the key columns specified by SORT_COLUMNS
>> together in one vector, so that it will be efficient for CPU cache while
>> sorting
>>>
>>> 3. index-build step:
>>> - Based on configuration, there can be index-build step or without this
>> step if SORT_COLUMNS is empty, which indicating no sort.
>>>
>>> - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage
>> on the fly. The sort result is array of rowId(pageId + offset). Avoid using
>> Arrays.sort in JDK.
>>>
>>> - The sorter should support batch sort or merge sort. For merge sort,
>> muliple spilling strategy can be implemented.
>>>
>>> 4. write step:
>>> - Calculate the start/end key of MDKey over the ColumnPage
>>>
>>> - Calculate the statistics of the ColumnPage. Currently in carbondata
>> min/max stats is hard coded. As a first step of extension, we want to
>> provide more stats like histogram and bloom filter so that user can choose
>> from. Optional field need to be added in thrift definition in DataChunk
>> struct
>>>
>>> - Encode and compress each column into byte array. To make encoding more
>> trasparent to the user and extensible, we should first decouple the data
>> type with the available codec options. So in the code can choose codec for
>> specific data type by some strategy. For the strategy, we can firstly have
>> a default one as beginning, later on we can open this interface for
>> developer to extension in the future. (say, developer can choose codec
>> based on some hueristic rules based on the data distribution)
>>>
>>> - After each column is encoded and compressed, follow the current
>> Producer-Consumer logic to write to disk.
>>>
>>>
>>> As the bottlenet will be the merge sort, some thought on the merging
>> strategy:
>>> Merge sort strategy:
>>> 1. spill both key columns and non-key columns in sorted order
>>> 2. spill key columns in sorted order and non-key columns in original
>> order, and spill rowId also. Apply rowId mapping only in merger stage. This
>> can avoid some random memory access on non-key columns. It is good if there
>> are many non-key columns
>>> 3. do not spill key columns, and spill non-key columns in origin order.
>> This is good as we can keep more key columns in memory
>>>
>>>
>>> Regards,
>>> Jacky
>>>
>>>
>>>> 在 2017年4月22日,下午8:08,Liang Chen <[hidden email]> 写道:
>>>>
>>>> Jacky, thank you list these constructive improvements of data loading.
>>>>
>>>> Agree to consider all these improvement points, only the below one i
>> have
>>>> some concerns.
>>>> Before considering open interfaces for data loading, we need to more
>>>> clearly define block/blocklet/page which play what different roles,
>> then we
>>>> could consider some interfaces for block level, some interfaces for
>>>> blocklet level, some for page level.
>>>>
>>>> Let us take up these improvements in the coming release.
>>>> ------------------------------------------------------------
>> --------------------------------------------------------
>>>> improvement 3: we want to open up some interfaces for letting developer
>> to
>>>> add more page encoding, statistics, page compression. These interface
>> will
>>>> be like callbacks, so developer can write new encoding/statistics/
>> compression
>>>> method and carbon loading process will invoke it in this step. This
>>>> interface will be like:
>>>>
>>>> Regards
>>>> Liang
>>>>
>>>> 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>:
>>>>
>>>>> I want to propose following improvement on data loading process:
>>>>>
>>>>> Currently different steps are using different data layout in CarbonRow,
>>>>> and it convert back and forth in different steps. It is not easy for
>>>>> developer to understand the data structure used in each steps and it
>>>>> increase the memory requirement as it is doing unnecessary data
>> copying in
>>>>> some steps. So, suggest to improve it as following
>>>>>
>>>>> 1) input step: read input and create a CarbonRow with all fields are
>>>>> string type
>>>>>
>>>>> 2) convert step: convert string to byte[] according to its data type,
>>>>> this step has compression effect of the input row so it is good for
>> saving
>>>>> memory and it also take cares of the null value
>>>>>     if it is dictionary dimension then convert to surrogate key;
>>>>>     if it is no dictionary then convert to byte[] representation;
>>>>>     if it is complex dimension, then convert to byte[]
>> representation;
>>>>>     if it is measure then convert to Object, like Integer, Long,
>>>>> Double, according to schema  —> change to byte[] instead of storing
>> Object
>>>>> to avoid boxing/unboxing and save memory
>>>>>
>>>>>     The conversion is happened by updating the field in CarbonRow in
>>>>> place, there should be no new CarbonRow created, however, there is a
>> copy
>>>>> operation of the input CarbonRow, for bad record handling  —>  do not
>> copy
>>>>> the row, convert it back to value if it is bad record
>>>>>
>>>>> 3) sort step:
>>>>>   improvement 1: sort the collected input CarbonRow. Currently this
>> is
>>>>> done by copying the row object into internal buffer and sort is done
>> on the
>>>>> buffer. —> to avoid the copying of the CarbonRow, we should create this
>>>>> buffer (with RowID) in input step, and only output the sorted RowID (by
>>>>> swapping its value in the RowID array) according to its value. If it
>> is a
>>>>> merge sort, then write to file based on this sorted RowID array when
>>>>> spilling to disk. So no copy of CarbonRow is required.
>>>>>
>>>>>  improvement 2: when spilling to disk, currently it changes the field
>>>>> order in CarbonRow, it is writing as a 3-elements array, [global
>> dictionary
>>>>> dimension, plain dimension and complex dimension, measure columns] ,
>> this
>>>>> is because the merger is expecting it like this —> I think this is
>>>>> unnecessary, we can add serialization/deserialization capability in
>>>>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it
>> also
>>>>> avoid this conversion in write step.
>>>>>
>>>>> 4) write step:
>>>>>   currently it will collect one page of data (32K rows) and start a
>>>>> Producer which actually is the encode process of one page. In order to
>>>>> support parallel processing, after the page data is encoded then put
>> it to
>>>>> a queue which will be taken by the Consumer, the Consumer will collect
>>>>> pages up to one blocklet size (configurable, say 64MB), and write to
>>>>> CarbonData files.
>>>>>
>>>>>   improvement 1: there is an unnecessary data copy and re-ordering of
>>>>> the fields of the row. it converts the row to: [measure columns, plain
>>>>> dimension and complex dimension,  global dictionary dimension] it is
>>>>> different from what sort step outputs. —> so suggest to use CarbonRow
>> only.
>>>>> no new row object should be created here.
>>>>>
>>>>>   improvement 2: there are multiple traversal of the page data in the
>>>>> code currently —> we should change to, firstly convert the CarbonRow to
>>>>> ColumnarPage which is the columnar representation for all columns in
>> one
>>>>> page, and collect the start/end key and statistics when doing this
>> columnar
>>>>> conversion. Then apply inverted index, RLE, compression process based
>> on
>>>>> ColumnarPage  object.
>>>>>
>>>>>   improvement 3: we want to open up some interfaces for letting
>>>>> developer to add more page encoding, statistics, page compression.
>> These
>>>>> interface will be like callbacks, so developer can write new
>>>>> encoding/statistics/compression method and carbon loading process will
>>>>> invoke it in this step. This interface will be like:
>>>>>
>>>>> /**
>>>>> *  Codec for a column page, implementation should not keep state across
>>>>> pages,
>>>>> *  caller will use the same object to encode multiple pages
>>>>> */
>>>>> interface PageCodec {
>>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */
>>>>> String getName();
>>>>> void startPage(int pageID);
>>>>> void processColumn(ColumnBatch batch);
>>>>> byte[] endPage(int pageID);
>>>>> ColumnBatch decode(byte[] encoded);
>>>>> }
>>>>>
>>>>> /** Compressor of the page data, the flow is encode->compress, and
>>>>> decompress->decode */
>>>>> interface PageCompressor {
>>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */
>>>>> String getName();
>>>>> byte[] compress(byte[] encodedData);
>>>>> byte[] decompress(byte[] data);
>>>>> }
>>>>>
>>>>> /** Calculate the statistics for a column page and blocklet */
>>>>> interface Statistics {
>>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */
>>>>> String getName();
>>>>> void startPage(int pageID);
>>>>> void endPage(int pageID);
>>>>> void startBlocklet(int blockletID);
>>>>> void endBlocklet(int blockletID);
>>>>>
>>>>> /** Update the stats for the input batch */
>>>>> void update(ColumnBatch batch);
>>>>>
>>>>> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3)
>> */
>>>>> int[] getPageStatistisc();
>>>>>
>>>>> /** Output will be written to Footer */
>>>>> int[] getBlockletStatistics();
>>>>> }
>>>>>
>>>>> And, there should be a partition step adding somewhere to support
>>>>> partition feature (CARBONDATA-910), and it depends on whether we
>> implement
>>>>> this partition shuffling in spark layer or carbon layer. (before input
>> step
>>>>> or after conversion step). What is the current idea of this? @CaiQiang
>>>>> @Lionel
>>>>>
>>>>> What you think about these improvements?
>>>>>
>>>>> Regards,
>>>>> Jacky
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards
>>>> Liang
>>>>
>>>
>>>
>>>
>>
>>
>>
>>
>
>
> --
> Thanks & Regards,
> Ravi
>