I want to propose following improvement on data loading process:
Currently different steps are using different data layout in CarbonRow, and it convert back and forth in different steps. It is not easy for developer to understand the data structure used in each steps and it increase the memory requirement as it is doing unnecessary data copying in some steps. So, suggest to improve it as following 1) input step: read input and create a CarbonRow with all fields are string type 2) convert step: convert string to byte[] according to its data type, this step has compression effect of the input row so it is good for saving memory and it also take cares of the null value if it is dictionary dimension then convert to surrogate key; if it is no dictionary then convert to byte[] representation; if it is complex dimension, then convert to byte[] representation; if it is measure then convert to Object, like Integer, Long, Double, according to schema —> change to byte[] instead of storing Object to avoid boxing/unboxing and save memory The conversion is happened by updating the field in CarbonRow in place, there should be no new CarbonRow created, however, there is a copy operation of the input CarbonRow, for bad record handling —> do not copy the row, convert it back to value if it is bad record 3) sort step: improvement 1: sort the collected input CarbonRow. Currently this is done by copying the row object into internal buffer and sort is done on the buffer. —> to avoid the copying of the CarbonRow, we should create this buffer (with RowID) in input step, and only output the sorted RowID (by swapping its value in the RowID array) according to its value. If it is a merge sort, then write to file based on this sorted RowID array when spilling to disk. So no copy of CarbonRow is required. improvement 2: when spilling to disk, currently it changes the field order in CarbonRow, it is writing as a 3-elements array, [global dictionary dimension, plain dimension and complex dimension, measure columns] , this is because the merger is expecting it like this —> I think this is unnecessary, we can add serialization/deserialization capability in CarbonRow and use CarbonRow instead. In the case of no-sort table, it also avoid this conversion in write step. 4) write step: currently it will collect one page of data (32K rows) and start a Producer which actually is the encode process of one page. In order to support parallel processing, after the page data is encoded then put it to a queue which will be taken by the Consumer, the Consumer will collect pages up to one blocklet size (configurable, say 64MB), and write to CarbonData files. improvement 1: there is an unnecessary data copy and re-ordering of the fields of the row. it converts the row to: [measure columns, plain dimension and complex dimension, global dictionary dimension] it is different from what sort step outputs. —> so suggest to use CarbonRow only. no new row object should be created here. improvement 2: there are multiple traversal of the page data in the code currently —> we should change to, firstly convert the CarbonRow to ColumnarPage which is the columnar representation for all columns in one page, and collect the start/end key and statistics when doing this columnar conversion. Then apply inverted index, RLE, compression process based on ColumnarPage object. improvement 3: we want to open up some interfaces for letting developer to add more page encoding, statistics, page compression. These interface will be like callbacks, so developer can write new encoding/statistics/compression method and carbon loading process will invoke it in this step. This interface will be like: /** * Codec for a column page, implementation should not keep state across pages, * caller will use the same object to encode multiple pages */ interface PageCodec { /** Codec name will be stored in BlockletHeader (DataChunk3) */ String getName(); void startPage(int pageID); void processColumn(ColumnBatch batch); byte[] endPage(int pageID); ColumnBatch decode(byte[] encoded); } /** Compressor of the page data, the flow is encode->compress, and decompress->decode */ interface PageCompressor { /** Codec name will be stored in BlockletHeader (DataChunk3) */ String getName(); byte[] compress(byte[] encodedData); byte[] decompress(byte[] data); } /** Calculate the statistics for a column page and blocklet */ interface Statistics { /** Codec name will be stored in BlockletHeader (DataChunk3) */ String getName(); void startPage(int pageID); void endPage(int pageID); void startBlocklet(int blockletID); void endBlocklet(int blockletID); /** Update the stats for the input batch */ void update(ColumnBatch batch); /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */ int[] getPageStatistisc(); /** Output will be written to Footer */ int[] getBlockletStatistics(); } And, there should be a partition step adding somewhere to support partition feature (CARBONDATA-910), and it depends on whether we implement this partition shuffling in spark layer or carbon layer. (before input step or after conversion step). What is the current idea of this? @CaiQiang @Lionel What you think about these improvements? Regards, Jacky |
Administrator
|
Jacky, thank you list these constructive improvements of data loading.
Agree to consider all these improvement points, only the below one i have some concerns. Before considering open interfaces for data loading, we need to more clearly define block/blocklet/page which play what different roles, then we could consider some interfaces for block level, some interfaces for blocklet level, some for page level. Let us take up these improvements in the coming release. -------------------------------------------------------------------------------------------------------------------- improvement 3: we want to open up some interfaces for letting developer to add more page encoding, statistics, page compression. These interface will be like callbacks, so developer can write new encoding/statistics/compression method and carbon loading process will invoke it in this step. This interface will be like: Regards Liang 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>: > I want to propose following improvement on data loading process: > > Currently different steps are using different data layout in CarbonRow, > and it convert back and forth in different steps. It is not easy for > developer to understand the data structure used in each steps and it > increase the memory requirement as it is doing unnecessary data copying in > some steps. So, suggest to improve it as following > > 1) input step: read input and create a CarbonRow with all fields are > string type > > 2) convert step: convert string to byte[] according to its data type, > this step has compression effect of the input row so it is good for saving > memory and it also take cares of the null value > if it is dictionary dimension then convert to surrogate key; > if it is no dictionary then convert to byte[] representation; > if it is complex dimension, then convert to byte[] representation; > if it is measure then convert to Object, like Integer, Long, > Double, according to schema —> change to byte[] instead of storing Object > to avoid boxing/unboxing and save memory > > The conversion is happened by updating the field in CarbonRow in > place, there should be no new CarbonRow created, however, there is a copy > operation of the input CarbonRow, for bad record handling —> do not copy > the row, convert it back to value if it is bad record > > 3) sort step: > improvement 1: sort the collected input CarbonRow. Currently this is > done by copying the row object into internal buffer and sort is done on the > buffer. —> to avoid the copying of the CarbonRow, we should create this > buffer (with RowID) in input step, and only output the sorted RowID (by > swapping its value in the RowID array) according to its value. If it is a > merge sort, then write to file based on this sorted RowID array when > spilling to disk. So no copy of CarbonRow is required. > > improvement 2: when spilling to disk, currently it changes the field > order in CarbonRow, it is writing as a 3-elements array, [global dictionary > dimension, plain dimension and complex dimension, measure columns] , this > is because the merger is expecting it like this —> I think this is > unnecessary, we can add serialization/deserialization capability in > CarbonRow and use CarbonRow instead. In the case of no-sort table, it also > avoid this conversion in write step. > > 4) write step: > currently it will collect one page of data (32K rows) and start a > Producer which actually is the encode process of one page. In order to > support parallel processing, after the page data is encoded then put it to > a queue which will be taken by the Consumer, the Consumer will collect > pages up to one blocklet size (configurable, say 64MB), and write to > CarbonData files. > > improvement 1: there is an unnecessary data copy and re-ordering of > the fields of the row. it converts the row to: [measure columns, plain > dimension and complex dimension, global dictionary dimension] it is > different from what sort step outputs. —> so suggest to use CarbonRow only. > no new row object should be created here. > > improvement 2: there are multiple traversal of the page data in the > code currently —> we should change to, firstly convert the CarbonRow to > ColumnarPage which is the columnar representation for all columns in one > page, and collect the start/end key and statistics when doing this columnar > conversion. Then apply inverted index, RLE, compression process based on > ColumnarPage object. > > improvement 3: we want to open up some interfaces for letting > developer to add more page encoding, statistics, page compression. These > interface will be like callbacks, so developer can write new > encoding/statistics/compression method and carbon loading process will > invoke it in this step. This interface will be like: > > /** > * Codec for a column page, implementation should not keep state across > pages, > * caller will use the same object to encode multiple pages > */ > interface PageCodec { > /** Codec name will be stored in BlockletHeader (DataChunk3) */ > String getName(); > void startPage(int pageID); > void processColumn(ColumnBatch batch); > byte[] endPage(int pageID); > ColumnBatch decode(byte[] encoded); > } > > /** Compressor of the page data, the flow is encode->compress, and > decompress->decode */ > interface PageCompressor { > /** Codec name will be stored in BlockletHeader (DataChunk3) */ > String getName(); > byte[] compress(byte[] encodedData); > byte[] decompress(byte[] data); > } > > /** Calculate the statistics for a column page and blocklet */ > interface Statistics { > /** Codec name will be stored in BlockletHeader (DataChunk3) */ > String getName(); > void startPage(int pageID); > void endPage(int pageID); > void startBlocklet(int blockletID); > void endBlocklet(int blockletID); > > /** Update the stats for the input batch */ > void update(ColumnBatch batch); > > /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */ > int[] getPageStatistisc(); > > /** Output will be written to Footer */ > int[] getBlockletStatistics(); > } > > And, there should be a partition step adding somewhere to support > partition feature (CARBONDATA-910), and it depends on whether we implement > this partition shuffling in spark layer or carbon layer. (before input step > or after conversion step). What is the current idea of this? @CaiQiang > @Lionel > > What you think about these improvements? > > Regards, > Jacky > > > > > -- Regards Liang |
In reply to this post by Jacky Li
+1 jacky, Its a very good initiative. I think it will improve the
performance by reducing the GC overhead as the new approach could potentially create lesser short lived objects. I have few concerns 1) I could not follow the Sort improvement using row ID array saperately, could you elaborate more on that. 2) For the multi traversal occuring in the write step, you had suggested conversion of CarbonRow to ColumnarPage. By question is , won't this conversion require multi-conversion? Regards Vimal On Fri, Apr 21, 2017 at 12:20 PM, Jacky Li <[hidden email]> wrote: > I want to propose following improvement on data loading process: > > Currently different steps are using different data layout in CarbonRow, > and it convert back and forth in different steps. It is not easy for > developer to understand the data structure used in each steps and it > increase the memory requirement as it is doing unnecessary data copying in > some steps. So, suggest to improve it as following > > 1) input step: read input and create a CarbonRow with all fields are > string type > > 2) convert step: convert string to byte[] according to its data type, > this step has compression effect of the input row so it is good for saving > memory and it also take cares of the null value > if it is dictionary dimension then convert to surrogate key; > if it is no dictionary then convert to byte[] representation; > if it is complex dimension, then convert to byte[] representation; > if it is measure then convert to Object, like Integer, Long, > Double, according to schema —> change to byte[] instead of storing Object > to avoid boxing/unboxing and save memory > > The conversion is happened by updating the field in CarbonRow in > place, there should be no new CarbonRow created, however, there is a copy > operation of the input CarbonRow, for bad record handling —> do not copy > the row, convert it back to value if it is bad record > > 3) sort step: > improvement 1: sort the collected input CarbonRow. Currently this is > done by copying the row object into internal buffer and sort is done on the > buffer. —> to avoid the copying of the CarbonRow, we should create this > buffer (with RowID) in input step, and only output the sorted RowID (by > swapping its value in the RowID array) according to its value. If it is a > merge sort, then write to file based on this sorted RowID array when > spilling to disk. So no copy of CarbonRow is required. > > improvement 2: when spilling to disk, currently it changes the field > order in CarbonRow, it is writing as a 3-elements array, [global dictionary > dimension, plain dimension and complex dimension, measure columns] , this > is because the merger is expecting it like this —> I think this is > unnecessary, we can add serialization/deserialization capability in > CarbonRow and use CarbonRow instead. In the case of no-sort table, it also > avoid this conversion in write step. > > 4) write step: > currently it will collect one page of data (32K rows) and start a > Producer which actually is the encode process of one page. In order to > support parallel processing, after the page data is encoded then put it to > a queue which will be taken by the Consumer, the Consumer will collect > pages up to one blocklet size (configurable, say 64MB), and write to > CarbonData files. > > improvement 1: there is an unnecessary data copy and re-ordering of > the fields of the row. it converts the row to: [measure columns, plain > dimension and complex dimension, global dictionary dimension] it is > different from what sort step outputs. —> so suggest to use CarbonRow only. > no new row object should be created here. > > improvement 2: there are multiple traversal of the page data in the > code currently —> we should change to, firstly convert the CarbonRow to > ColumnarPage which is the columnar representation for all columns in one > page, and collect the start/end key and statistics when doing this columnar > conversion. Then apply inverted index, RLE, compression process based on > ColumnarPage object. > > improvement 3: we want to open up some interfaces for letting > developer to add more page encoding, statistics, page compression. These > interface will be like callbacks, so developer can write new > encoding/statistics/compression method and carbon loading process will > invoke it in this step. This interface will be like: > > /** > * Codec for a column page, implementation should not keep state across > pages, > * caller will use the same object to encode multiple pages > */ > interface PageCodec { > /** Codec name will be stored in BlockletHeader (DataChunk3) */ > String getName(); > void startPage(int pageID); > void processColumn(ColumnBatch batch); > byte[] endPage(int pageID); > ColumnBatch decode(byte[] encoded); > } > > /** Compressor of the page data, the flow is encode->compress, and > decompress->decode */ > interface PageCompressor { > /** Codec name will be stored in BlockletHeader (DataChunk3) */ > String getName(); > byte[] compress(byte[] encodedData); > byte[] decompress(byte[] data); > } > > /** Calculate the statistics for a column page and blocklet */ > interface Statistics { > /** Codec name will be stored in BlockletHeader (DataChunk3) */ > String getName(); > void startPage(int pageID); > void endPage(int pageID); > void startBlocklet(int blockletID); > void endBlocklet(int blockletID); > > /** Update the stats for the input batch */ > void update(ColumnBatch batch); > > /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */ > int[] getPageStatistisc(); > > /** Output will be written to Footer */ > int[] getBlockletStatistics(); > } > > And, there should be a partition step adding somewhere to support > partition feature (CARBONDATA-910), and it depends on whether we implement > this partition shuffling in spark layer or carbon layer. (before input step > or after conversion step). What is the current idea of this? @CaiQiang > @Lionel > > What you think about these improvements? > > Regards, > Jacky > > > > > |
reply inline
> 在 2017年4月26日,上午2:03,Vimal Das Kammath <[hidden email]> 写道: > > +1 jacky, Its a very good initiative. I think it will improve the > performance by reducing the GC overhead as the new approach could > potentially create lesser short lived objects. > > I have few concerns > 1) I could not follow the Sort improvement using row ID array saperately, > could you elaborate more on that. I think we do not need to sort the CarbonRow and swap its location in the array (Arrays.sort and there are multiple data copying happending in SortDataRows class). We only need to maintain a RowID array and sort this array according to the fields in SORT_COLUMNS option. And now I am thinking to move the columnar conversion to input step only so that after reading from data source then we can apply vecterized process in all steps after input step. > 2) For the multi traversal occuring in the write step, you had suggested > conversion of CarbonRow to ColumnarPage. By question is , won't this > conversion require multi-conversion? Yes, I think we should convert row to columnar at the beginning of the loading process. > > Regards > Vimal > > On Fri, Apr 21, 2017 at 12:20 PM, Jacky Li <[hidden email]> wrote: > >> I want to propose following improvement on data loading process: >> >> Currently different steps are using different data layout in CarbonRow, >> and it convert back and forth in different steps. It is not easy for >> developer to understand the data structure used in each steps and it >> increase the memory requirement as it is doing unnecessary data copying in >> some steps. So, suggest to improve it as following >> >> 1) input step: read input and create a CarbonRow with all fields are >> string type >> >> 2) convert step: convert string to byte[] according to its data type, >> this step has compression effect of the input row so it is good for saving >> memory and it also take cares of the null value >> if it is dictionary dimension then convert to surrogate key; >> if it is no dictionary then convert to byte[] representation; >> if it is complex dimension, then convert to byte[] representation; >> if it is measure then convert to Object, like Integer, Long, >> Double, according to schema —> change to byte[] instead of storing Object >> to avoid boxing/unboxing and save memory >> >> The conversion is happened by updating the field in CarbonRow in >> place, there should be no new CarbonRow created, however, there is a copy >> operation of the input CarbonRow, for bad record handling —> do not copy >> the row, convert it back to value if it is bad record >> >> 3) sort step: >> improvement 1: sort the collected input CarbonRow. Currently this is >> done by copying the row object into internal buffer and sort is done on the >> buffer. —> to avoid the copying of the CarbonRow, we should create this >> buffer (with RowID) in input step, and only output the sorted RowID (by >> swapping its value in the RowID array) according to its value. If it is a >> merge sort, then write to file based on this sorted RowID array when >> spilling to disk. So no copy of CarbonRow is required. >> >> improvement 2: when spilling to disk, currently it changes the field >> order in CarbonRow, it is writing as a 3-elements array, [global dictionary >> dimension, plain dimension and complex dimension, measure columns] , this >> is because the merger is expecting it like this —> I think this is >> unnecessary, we can add serialization/deserialization capability in >> CarbonRow and use CarbonRow instead. In the case of no-sort table, it also >> avoid this conversion in write step. >> >> 4) write step: >> currently it will collect one page of data (32K rows) and start a >> Producer which actually is the encode process of one page. In order to >> support parallel processing, after the page data is encoded then put it to >> a queue which will be taken by the Consumer, the Consumer will collect >> pages up to one blocklet size (configurable, say 64MB), and write to >> CarbonData files. >> >> improvement 1: there is an unnecessary data copy and re-ordering of >> the fields of the row. it converts the row to: [measure columns, plain >> dimension and complex dimension, global dictionary dimension] it is >> different from what sort step outputs. —> so suggest to use CarbonRow only. >> no new row object should be created here. >> >> improvement 2: there are multiple traversal of the page data in the >> code currently —> we should change to, firstly convert the CarbonRow to >> ColumnarPage which is the columnar representation for all columns in one >> page, and collect the start/end key and statistics when doing this columnar >> conversion. Then apply inverted index, RLE, compression process based on >> ColumnarPage object. >> >> improvement 3: we want to open up some interfaces for letting >> developer to add more page encoding, statistics, page compression. These >> interface will be like callbacks, so developer can write new >> encoding/statistics/compression method and carbon loading process will >> invoke it in this step. This interface will be like: >> >> /** >> * Codec for a column page, implementation should not keep state across >> pages, >> * caller will use the same object to encode multiple pages >> */ >> interface PageCodec { >> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >> String getName(); >> void startPage(int pageID); >> void processColumn(ColumnBatch batch); >> byte[] endPage(int pageID); >> ColumnBatch decode(byte[] encoded); >> } >> >> /** Compressor of the page data, the flow is encode->compress, and >> decompress->decode */ >> interface PageCompressor { >> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >> String getName(); >> byte[] compress(byte[] encodedData); >> byte[] decompress(byte[] data); >> } >> >> /** Calculate the statistics for a column page and blocklet */ >> interface Statistics { >> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >> String getName(); >> void startPage(int pageID); >> void endPage(int pageID); >> void startBlocklet(int blockletID); >> void endBlocklet(int blockletID); >> >> /** Update the stats for the input batch */ >> void update(ColumnBatch batch); >> >> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */ >> int[] getPageStatistisc(); >> >> /** Output will be written to Footer */ >> int[] getBlockletStatistics(); >> } >> >> And, there should be a partition step adding somewhere to support >> partition feature (CARBONDATA-910), and it depends on whether we implement >> this partition shuffling in spark layer or carbon layer. (before input step >> or after conversion step). What is the current idea of this? @CaiQiang >> @Lionel >> >> What you think about these improvements? >> >> Regards, >> Jacky >> >> >> >> >> |
In reply to this post by Liang Chen
Hi community,
More idea on the loading process, in my opinion the ideal one should be as following, please comment: 1. input step: - Do the parsing of input data, either CSV or Dataframe, they all convert into CarbonRow. - Buffering them to CarbonRowBatch - Prefetchiong of rows 2. convert step: - Convert fields based on the category of columns - for global dictionary encoded columns, convert them into dictionary id. - for primitive type columns, just keep as origin type. (no conversion) - for string and variable length data type columns (like complex columns), convert to LV (length-value) encoded byte array - Convert input row to columnar layout as we read from previous step. Create a ColumnPage for one column, and a DataPage for all columns. - The DataPage should store the key columns specified by SORT_COLUMNS together in one vector, so that it will be efficient for CPU cache while sorting 3. index-build step: - Based on configuration, there can be index-build step or without this step if SORT_COLUMNS is empty, which indicating no sort. - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage on the fly. The sort result is array of rowId(pageId + offset). Avoid using Arrays.sort in JDK. - The sorter should support batch sort or merge sort. For merge sort, muliple spilling strategy can be implemented. 4. write step: - Calculate the start/end key of MDKey over the ColumnPage - Calculate the statistics of the ColumnPage. Currently in carbondata min/max stats is hard coded. As a first step of extension, we want to provide more stats like histogram and bloom filter so that user can choose from. Optional field need to be added in thrift definition in DataChunk struct - Encode and compress each column into byte array. To make encoding more trasparent to the user and extensible, we should first decouple the data type with the available codec options. So in the code can choose codec for specific data type by some strategy. For the strategy, we can firstly have a default one as beginning, later on we can open this interface for developer to extension in the future. (say, developer can choose codec based on some hueristic rules based on the data distribution) - After each column is encoded and compressed, follow the current Producer-Consumer logic to write to disk. As the bottlenet will be the merge sort, some thought on the merging strategy: Merge sort strategy: 1. spill both key columns and non-key columns in sorted order 2. spill key columns in sorted order and non-key columns in original order, and spill rowId also. Apply rowId mapping only in merger stage. This can avoid some random memory access on non-key columns. It is good if there are many non-key columns 3. do not spill key columns, and spill non-key columns in origin order. This is good as we can keep more key columns in memory Regards, Jacky > 在 2017年4月22日,下午8:08,Liang Chen <[hidden email]> 写道: > > Jacky, thank you list these constructive improvements of data loading. > > Agree to consider all these improvement points, only the below one i have > some concerns. > Before considering open interfaces for data loading, we need to more > clearly define block/blocklet/page which play what different roles, then we > could consider some interfaces for block level, some interfaces for > blocklet level, some for page level. > > Let us take up these improvements in the coming release. > -------------------------------------------------------------------------------------------------------------------- > improvement 3: we want to open up some interfaces for letting developer to > add more page encoding, statistics, page compression. These interface will > be like callbacks, so developer can write new encoding/statistics/compression > method and carbon loading process will invoke it in this step. This > interface will be like: > > Regards > Liang > > 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>: > >> I want to propose following improvement on data loading process: >> >> Currently different steps are using different data layout in CarbonRow, >> and it convert back and forth in different steps. It is not easy for >> developer to understand the data structure used in each steps and it >> increase the memory requirement as it is doing unnecessary data copying in >> some steps. So, suggest to improve it as following >> >> 1) input step: read input and create a CarbonRow with all fields are >> string type >> >> 2) convert step: convert string to byte[] according to its data type, >> this step has compression effect of the input row so it is good for saving >> memory and it also take cares of the null value >> if it is dictionary dimension then convert to surrogate key; >> if it is no dictionary then convert to byte[] representation; >> if it is complex dimension, then convert to byte[] representation; >> if it is measure then convert to Object, like Integer, Long, >> Double, according to schema —> change to byte[] instead of storing Object >> to avoid boxing/unboxing and save memory >> >> The conversion is happened by updating the field in CarbonRow in >> place, there should be no new CarbonRow created, however, there is a copy >> operation of the input CarbonRow, for bad record handling —> do not copy >> the row, convert it back to value if it is bad record >> >> 3) sort step: >> improvement 1: sort the collected input CarbonRow. Currently this is >> done by copying the row object into internal buffer and sort is done on the >> buffer. —> to avoid the copying of the CarbonRow, we should create this >> buffer (with RowID) in input step, and only output the sorted RowID (by >> swapping its value in the RowID array) according to its value. If it is a >> merge sort, then write to file based on this sorted RowID array when >> spilling to disk. So no copy of CarbonRow is required. >> >> improvement 2: when spilling to disk, currently it changes the field >> order in CarbonRow, it is writing as a 3-elements array, [global dictionary >> dimension, plain dimension and complex dimension, measure columns] , this >> is because the merger is expecting it like this —> I think this is >> unnecessary, we can add serialization/deserialization capability in >> CarbonRow and use CarbonRow instead. In the case of no-sort table, it also >> avoid this conversion in write step. >> >> 4) write step: >> currently it will collect one page of data (32K rows) and start a >> Producer which actually is the encode process of one page. In order to >> support parallel processing, after the page data is encoded then put it to >> a queue which will be taken by the Consumer, the Consumer will collect >> pages up to one blocklet size (configurable, say 64MB), and write to >> CarbonData files. >> >> improvement 1: there is an unnecessary data copy and re-ordering of >> the fields of the row. it converts the row to: [measure columns, plain >> dimension and complex dimension, global dictionary dimension] it is >> different from what sort step outputs. —> so suggest to use CarbonRow only. >> no new row object should be created here. >> >> improvement 2: there are multiple traversal of the page data in the >> code currently —> we should change to, firstly convert the CarbonRow to >> ColumnarPage which is the columnar representation for all columns in one >> page, and collect the start/end key and statistics when doing this columnar >> conversion. Then apply inverted index, RLE, compression process based on >> ColumnarPage object. >> >> improvement 3: we want to open up some interfaces for letting >> developer to add more page encoding, statistics, page compression. These >> interface will be like callbacks, so developer can write new >> encoding/statistics/compression method and carbon loading process will >> invoke it in this step. This interface will be like: >> >> /** >> * Codec for a column page, implementation should not keep state across >> pages, >> * caller will use the same object to encode multiple pages >> */ >> interface PageCodec { >> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >> String getName(); >> void startPage(int pageID); >> void processColumn(ColumnBatch batch); >> byte[] endPage(int pageID); >> ColumnBatch decode(byte[] encoded); >> } >> >> /** Compressor of the page data, the flow is encode->compress, and >> decompress->decode */ >> interface PageCompressor { >> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >> String getName(); >> byte[] compress(byte[] encodedData); >> byte[] decompress(byte[] data); >> } >> >> /** Calculate the statistics for a column page and blocklet */ >> interface Statistics { >> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >> String getName(); >> void startPage(int pageID); >> void endPage(int pageID); >> void startBlocklet(int blockletID); >> void endBlocklet(int blockletID); >> >> /** Update the stats for the input batch */ >> void update(ColumnBatch batch); >> >> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */ >> int[] getPageStatistisc(); >> >> /** Output will be written to Footer */ >> int[] getBlockletStatistics(); >> } >> >> And, there should be a partition step adding somewhere to support >> partition feature (CARBONDATA-910), and it depends on whether we implement >> this partition shuffling in spark layer or carbon layer. (before input step >> or after conversion step). What is the current idea of this? @CaiQiang >> @Lionel >> >> What you think about these improvements? >> >> Regards, >> Jacky >> >> >> >> >> > > > -- > Regards > Liang > |
Hi,
While I am working on data load improvement and encoding override feature, I found that it is not efficient to use the CarbonRow with Object[]. I think a better way is to use fix length primitive type instead of Object. Since currently SORT_COLUMNS is merged, I think it is possible to: 1. separate key columns and non key columns 2. use exact data type for all columns instead of Object 3. no need to differentiate dimension and measure while loading, encoding is based on default strategy or user override For example: TablePage { ColumnPage[] keyPages; ColumnPage[] nonKeyPages; } ColumnPage has following implementations: 1. FixtLengthColumnPage // internal data type: byte, short, int, long, float, double, dictionary encoding column also should use this class 2. StringColumnPage // internal data type: byte[] 3. DecimalColumnPage // internal data type: byte[] 4. ComplexColumnPage // internal data type: byte[] If user specify only dictionary column in SORT_COLUMNS and cardinality is less, it is possible that we can further optimize the keyPage member as long data type (can hold up to 4 columns, each column cardinality less than 65535), so that it can be fit in CPU cache and it will be more efficient while sorting. Regards, Jacky > 在 2017年4月27日,上午9:20,Jacky Li <[hidden email]> 写道: > > Hi community, > > More idea on the loading process, in my opinion the ideal one should be as following, please comment: > > 1. input step: > - Do the parsing of input data, either CSV or Dataframe, they all convert into CarbonRow. > - Buffering them to CarbonRowBatch > - Prefetchiong of rows > > 2. convert step: > - Convert fields based on the category of columns > - for global dictionary encoded columns, convert them into dictionary id. > - for primitive type columns, just keep as origin type. (no conversion) > - for string and variable length data type columns (like complex columns), convert to LV (length-value) encoded byte array > > - Convert input row to columnar layout as we read from previous step. Create a ColumnPage for one column, and a DataPage for all columns. > > - The DataPage should store the key columns specified by SORT_COLUMNS together in one vector, so that it will be efficient for CPU cache while sorting > > 3. index-build step: > - Based on configuration, there can be index-build step or without this step if SORT_COLUMNS is empty, which indicating no sort. > > - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage on the fly. The sort result is array of rowId(pageId + offset). Avoid using Arrays.sort in JDK. > > - The sorter should support batch sort or merge sort. For merge sort, muliple spilling strategy can be implemented. > > 4. write step: > - Calculate the start/end key of MDKey over the ColumnPage > > - Calculate the statistics of the ColumnPage. Currently in carbondata min/max stats is hard coded. As a first step of extension, we want to provide more stats like histogram and bloom filter so that user can choose from. Optional field need to be added in thrift definition in DataChunk struct > > - Encode and compress each column into byte array. To make encoding more trasparent to the user and extensible, we should first decouple the data type with the available codec options. So in the code can choose codec for specific data type by some strategy. For the strategy, we can firstly have a default one as beginning, later on we can open this interface for developer to extension in the future. (say, developer can choose codec based on some hueristic rules based on the data distribution) > > - After each column is encoded and compressed, follow the current Producer-Consumer logic to write to disk. > > > As the bottlenet will be the merge sort, some thought on the merging strategy: > Merge sort strategy: > 1. spill both key columns and non-key columns in sorted order > 2. spill key columns in sorted order and non-key columns in original order, and spill rowId also. Apply rowId mapping only in merger stage. This can avoid some random memory access on non-key columns. It is good if there are many non-key columns > 3. do not spill key columns, and spill non-key columns in origin order. This is good as we can keep more key columns in memory > > > Regards, > Jacky > > >> 在 2017年4月22日,下午8:08,Liang Chen <[hidden email]> 写道: >> >> Jacky, thank you list these constructive improvements of data loading. >> >> Agree to consider all these improvement points, only the below one i have >> some concerns. >> Before considering open interfaces for data loading, we need to more >> clearly define block/blocklet/page which play what different roles, then we >> could consider some interfaces for block level, some interfaces for >> blocklet level, some for page level. >> >> Let us take up these improvements in the coming release. >> -------------------------------------------------------------------------------------------------------------------- >> improvement 3: we want to open up some interfaces for letting developer to >> add more page encoding, statistics, page compression. These interface will >> be like callbacks, so developer can write new encoding/statistics/compression >> method and carbon loading process will invoke it in this step. This >> interface will be like: >> >> Regards >> Liang >> >> 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>: >> >>> I want to propose following improvement on data loading process: >>> >>> Currently different steps are using different data layout in CarbonRow, >>> and it convert back and forth in different steps. It is not easy for >>> developer to understand the data structure used in each steps and it >>> increase the memory requirement as it is doing unnecessary data copying in >>> some steps. So, suggest to improve it as following >>> >>> 1) input step: read input and create a CarbonRow with all fields are >>> string type >>> >>> 2) convert step: convert string to byte[] according to its data type, >>> this step has compression effect of the input row so it is good for saving >>> memory and it also take cares of the null value >>> if it is dictionary dimension then convert to surrogate key; >>> if it is no dictionary then convert to byte[] representation; >>> if it is complex dimension, then convert to byte[] representation; >>> if it is measure then convert to Object, like Integer, Long, >>> Double, according to schema —> change to byte[] instead of storing Object >>> to avoid boxing/unboxing and save memory >>> >>> The conversion is happened by updating the field in CarbonRow in >>> place, there should be no new CarbonRow created, however, there is a copy >>> operation of the input CarbonRow, for bad record handling —> do not copy >>> the row, convert it back to value if it is bad record >>> >>> 3) sort step: >>> improvement 1: sort the collected input CarbonRow. Currently this is >>> done by copying the row object into internal buffer and sort is done on the >>> buffer. —> to avoid the copying of the CarbonRow, we should create this >>> buffer (with RowID) in input step, and only output the sorted RowID (by >>> swapping its value in the RowID array) according to its value. If it is a >>> merge sort, then write to file based on this sorted RowID array when >>> spilling to disk. So no copy of CarbonRow is required. >>> >>> improvement 2: when spilling to disk, currently it changes the field >>> order in CarbonRow, it is writing as a 3-elements array, [global dictionary >>> dimension, plain dimension and complex dimension, measure columns] , this >>> is because the merger is expecting it like this —> I think this is >>> unnecessary, we can add serialization/deserialization capability in >>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it also >>> avoid this conversion in write step. >>> >>> 4) write step: >>> currently it will collect one page of data (32K rows) and start a >>> Producer which actually is the encode process of one page. In order to >>> support parallel processing, after the page data is encoded then put it to >>> a queue which will be taken by the Consumer, the Consumer will collect >>> pages up to one blocklet size (configurable, say 64MB), and write to >>> CarbonData files. >>> >>> improvement 1: there is an unnecessary data copy and re-ordering of >>> the fields of the row. it converts the row to: [measure columns, plain >>> dimension and complex dimension, global dictionary dimension] it is >>> different from what sort step outputs. —> so suggest to use CarbonRow only. >>> no new row object should be created here. >>> >>> improvement 2: there are multiple traversal of the page data in the >>> code currently —> we should change to, firstly convert the CarbonRow to >>> ColumnarPage which is the columnar representation for all columns in one >>> page, and collect the start/end key and statistics when doing this columnar >>> conversion. Then apply inverted index, RLE, compression process based on >>> ColumnarPage object. >>> >>> improvement 3: we want to open up some interfaces for letting >>> developer to add more page encoding, statistics, page compression. These >>> interface will be like callbacks, so developer can write new >>> encoding/statistics/compression method and carbon loading process will >>> invoke it in this step. This interface will be like: >>> >>> /** >>> * Codec for a column page, implementation should not keep state across >>> pages, >>> * caller will use the same object to encode multiple pages >>> */ >>> interface PageCodec { >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>> String getName(); >>> void startPage(int pageID); >>> void processColumn(ColumnBatch batch); >>> byte[] endPage(int pageID); >>> ColumnBatch decode(byte[] encoded); >>> } >>> >>> /** Compressor of the page data, the flow is encode->compress, and >>> decompress->decode */ >>> interface PageCompressor { >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>> String getName(); >>> byte[] compress(byte[] encodedData); >>> byte[] decompress(byte[] data); >>> } >>> >>> /** Calculate the statistics for a column page and blocklet */ >>> interface Statistics { >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>> String getName(); >>> void startPage(int pageID); >>> void endPage(int pageID); >>> void startBlocklet(int blockletID); >>> void endBlocklet(int blockletID); >>> >>> /** Update the stats for the input batch */ >>> void update(ColumnBatch batch); >>> >>> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */ >>> int[] getPageStatistisc(); >>> >>> /** Output will be written to Footer */ >>> int[] getBlockletStatistics(); >>> } >>> >>> And, there should be a partition step adding somewhere to support >>> partition feature (CARBONDATA-910), and it depends on whether we implement >>> this partition shuffling in spark layer or carbon layer. (before input step >>> or after conversion step). What is the current idea of this? @CaiQiang >>> @Lionel >>> >>> What you think about these improvements? >>> >>> Regards, >>> Jacky >>> >>> >>> >>> >>> >> >> >> -- >> Regards >> Liang >> > > > |
As I known, System.arrayCopy of object array is a shallow copy, so I think both KeyPage and TablePage maybe have the same performance on Arrays.sort.
Best Regards
David Cai |
In reply to this post by Jacky Li
Hi,
Using Object[] as a row while loading is not efficient in terms of memory usage. It would be more efficient to keep them in unsafe as it can keep the data in more compacted way as per data type. And regarding sorting it would be good to concentrate on single sorting solution. Since we already have stable unsafe sort we should better consider removing other normal sorting implementation to reduce the maintaining cost. And concentrate more on improving the unsafe sort. As per the observation GC is reduced a lot in case of unsafe sort but still some GC is observed, that is I think because of writer step as it still holds the data for some time on heap for sorting and writing the data in blocklet wise. Even this data also we can consider keep in unsafe to avoid GC. Regards, Ravindra. On 21 May 2017 at 23:15, Jacky Li <[hidden email]> wrote: > Hi, > > While I am working on data load improvement and encoding override feature, > I found that it is not efficient to use the CarbonRow with Object[]. I > think a better way is to use fix length primitive type instead of Object. > > Since currently SORT_COLUMNS is merged, I think it is possible to: > 1. separate key columns and non key columns > 2. use exact data type for all columns instead of Object > 3. no need to differentiate dimension and measure while loading, encoding > is based on default strategy or user override > > For example: > > TablePage { > ColumnPage[] keyPages; > ColumnPage[] nonKeyPages; > } > > ColumnPage has following implementations: > 1. FixtLengthColumnPage // internal data type: byte, short, int, long, > float, double, dictionary encoding column also should use this class > 2. StringColumnPage // internal data type: byte[] > 3. DecimalColumnPage // internal data type: byte[] > 4. ComplexColumnPage // internal data type: byte[] > > If user specify only dictionary column in SORT_COLUMNS and cardinality is > less, it is possible that we can further optimize the keyPage member as > long data type (can hold up to 4 columns, each column cardinality less than > 65535), so that it can be fit in CPU cache and it will be more efficient > while sorting. > > Regards, > Jacky > > > 在 2017年4月27日,上午9:20,Jacky Li <[hidden email]> 写道: > > > > Hi community, > > > > More idea on the loading process, in my opinion the ideal one should be > as following, please comment: > > > > 1. input step: > > - Do the parsing of input data, either CSV or Dataframe, they all > convert into CarbonRow. > > - Buffering them to CarbonRowBatch > > - Prefetchiong of rows > > > > 2. convert step: > > - Convert fields based on the category of columns > > - for global dictionary encoded columns, convert them into dictionary > id. > > - for primitive type columns, just keep as origin type. (no conversion) > > - for string and variable length data type columns (like complex > columns), convert to LV (length-value) encoded byte array > > > > - Convert input row to columnar layout as we read from previous step. > Create a ColumnPage for one column, and a DataPage for all columns. > > > > - The DataPage should store the key columns specified by SORT_COLUMNS > together in one vector, so that it will be efficient for CPU cache while > sorting > > > > 3. index-build step: > > - Based on configuration, there can be index-build step or without this > step if SORT_COLUMNS is empty, which indicating no sort. > > > > - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage > on the fly. The sort result is array of rowId(pageId + offset). Avoid using > Arrays.sort in JDK. > > > > - The sorter should support batch sort or merge sort. For merge sort, > muliple spilling strategy can be implemented. > > > > 4. write step: > > - Calculate the start/end key of MDKey over the ColumnPage > > > > - Calculate the statistics of the ColumnPage. Currently in carbondata > min/max stats is hard coded. As a first step of extension, we want to > provide more stats like histogram and bloom filter so that user can choose > from. Optional field need to be added in thrift definition in DataChunk > struct > > > > - Encode and compress each column into byte array. To make encoding more > trasparent to the user and extensible, we should first decouple the data > type with the available codec options. So in the code can choose codec for > specific data type by some strategy. For the strategy, we can firstly have > a default one as beginning, later on we can open this interface for > developer to extension in the future. (say, developer can choose codec > based on some hueristic rules based on the data distribution) > > > > - After each column is encoded and compressed, follow the current > Producer-Consumer logic to write to disk. > > > > > > As the bottlenet will be the merge sort, some thought on the merging > strategy: > > Merge sort strategy: > > 1. spill both key columns and non-key columns in sorted order > > 2. spill key columns in sorted order and non-key columns in original > order, and spill rowId also. Apply rowId mapping only in merger stage. This > can avoid some random memory access on non-key columns. It is good if there > are many non-key columns > > 3. do not spill key columns, and spill non-key columns in origin order. > This is good as we can keep more key columns in memory > > > > > > Regards, > > Jacky > > > > > >> 在 2017年4月22日,下午8:08,Liang Chen <[hidden email]> 写道: > >> > >> Jacky, thank you list these constructive improvements of data loading. > >> > >> Agree to consider all these improvement points, only the below one i > have > >> some concerns. > >> Before considering open interfaces for data loading, we need to more > >> clearly define block/blocklet/page which play what different roles, > then we > >> could consider some interfaces for block level, some interfaces for > >> blocklet level, some for page level. > >> > >> Let us take up these improvements in the coming release. > >> ------------------------------------------------------------ > -------------------------------------------------------- > >> improvement 3: we want to open up some interfaces for letting developer > to > >> add more page encoding, statistics, page compression. These interface > will > >> be like callbacks, so developer can write new encoding/statistics/ > compression > >> method and carbon loading process will invoke it in this step. This > >> interface will be like: > >> > >> Regards > >> Liang > >> > >> 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>: > >> > >>> I want to propose following improvement on data loading process: > >>> > >>> Currently different steps are using different data layout in CarbonRow, > >>> and it convert back and forth in different steps. It is not easy for > >>> developer to understand the data structure used in each steps and it > >>> increase the memory requirement as it is doing unnecessary data > copying in > >>> some steps. So, suggest to improve it as following > >>> > >>> 1) input step: read input and create a CarbonRow with all fields are > >>> string type > >>> > >>> 2) convert step: convert string to byte[] according to its data type, > >>> this step has compression effect of the input row so it is good for > saving > >>> memory and it also take cares of the null value > >>> if it is dictionary dimension then convert to surrogate key; > >>> if it is no dictionary then convert to byte[] representation; > >>> if it is complex dimension, then convert to byte[] > representation; > >>> if it is measure then convert to Object, like Integer, Long, > >>> Double, according to schema —> change to byte[] instead of storing > Object > >>> to avoid boxing/unboxing and save memory > >>> > >>> The conversion is happened by updating the field in CarbonRow in > >>> place, there should be no new CarbonRow created, however, there is a > copy > >>> operation of the input CarbonRow, for bad record handling —> do not > copy > >>> the row, convert it back to value if it is bad record > >>> > >>> 3) sort step: > >>> improvement 1: sort the collected input CarbonRow. Currently this > is > >>> done by copying the row object into internal buffer and sort is done > on the > >>> buffer. —> to avoid the copying of the CarbonRow, we should create this > >>> buffer (with RowID) in input step, and only output the sorted RowID (by > >>> swapping its value in the RowID array) according to its value. If it > is a > >>> merge sort, then write to file based on this sorted RowID array when > >>> spilling to disk. So no copy of CarbonRow is required. > >>> > >>> improvement 2: when spilling to disk, currently it changes the field > >>> order in CarbonRow, it is writing as a 3-elements array, [global > dictionary > >>> dimension, plain dimension and complex dimension, measure columns] , > this > >>> is because the merger is expecting it like this —> I think this is > >>> unnecessary, we can add serialization/deserialization capability in > >>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it > also > >>> avoid this conversion in write step. > >>> > >>> 4) write step: > >>> currently it will collect one page of data (32K rows) and start a > >>> Producer which actually is the encode process of one page. In order to > >>> support parallel processing, after the page data is encoded then put > it to > >>> a queue which will be taken by the Consumer, the Consumer will collect > >>> pages up to one blocklet size (configurable, say 64MB), and write to > >>> CarbonData files. > >>> > >>> improvement 1: there is an unnecessary data copy and re-ordering of > >>> the fields of the row. it converts the row to: [measure columns, plain > >>> dimension and complex dimension, global dictionary dimension] it is > >>> different from what sort step outputs. —> so suggest to use CarbonRow > only. > >>> no new row object should be created here. > >>> > >>> improvement 2: there are multiple traversal of the page data in the > >>> code currently —> we should change to, firstly convert the CarbonRow to > >>> ColumnarPage which is the columnar representation for all columns in > one > >>> page, and collect the start/end key and statistics when doing this > columnar > >>> conversion. Then apply inverted index, RLE, compression process based > on > >>> ColumnarPage object. > >>> > >>> improvement 3: we want to open up some interfaces for letting > >>> developer to add more page encoding, statistics, page compression. > These > >>> interface will be like callbacks, so developer can write new > >>> encoding/statistics/compression method and carbon loading process will > >>> invoke it in this step. This interface will be like: > >>> > >>> /** > >>> * Codec for a column page, implementation should not keep state across > >>> pages, > >>> * caller will use the same object to encode multiple pages > >>> */ > >>> interface PageCodec { > >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ > >>> String getName(); > >>> void startPage(int pageID); > >>> void processColumn(ColumnBatch batch); > >>> byte[] endPage(int pageID); > >>> ColumnBatch decode(byte[] encoded); > >>> } > >>> > >>> /** Compressor of the page data, the flow is encode->compress, and > >>> decompress->decode */ > >>> interface PageCompressor { > >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ > >>> String getName(); > >>> byte[] compress(byte[] encodedData); > >>> byte[] decompress(byte[] data); > >>> } > >>> > >>> /** Calculate the statistics for a column page and blocklet */ > >>> interface Statistics { > >>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ > >>> String getName(); > >>> void startPage(int pageID); > >>> void endPage(int pageID); > >>> void startBlocklet(int blockletID); > >>> void endBlocklet(int blockletID); > >>> > >>> /** Update the stats for the input batch */ > >>> void update(ColumnBatch batch); > >>> > >>> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) > */ > >>> int[] getPageStatistisc(); > >>> > >>> /** Output will be written to Footer */ > >>> int[] getBlockletStatistics(); > >>> } > >>> > >>> And, there should be a partition step adding somewhere to support > >>> partition feature (CARBONDATA-910), and it depends on whether we > implement > >>> this partition shuffling in spark layer or carbon layer. (before input > step > >>> or after conversion step). What is the current idea of this? @CaiQiang > >>> @Lionel > >>> > >>> What you think about these improvements? > >>> > >>> Regards, > >>> Jacky > >>> > >>> > >>> > >>> > >>> > >> > >> > >> -- > >> Regards > >> Liang > >> > > > > > > > > > > -- Thanks & Regards, Ravi |
In reply to this post by David CaiQiang
For sorting, I think more optimization we can do, I am currently thinking these:
1. Do not sort the whole TablePage, only KeyPage is required as the sort key 2. Should find a more memory efficient sorting algorithm than System.arraycopy which requires doubling space. 3. Should try to hold the KeyPage as well as the RowId in a compact data structure, it is best if it fits in CPU cache. Modern L3 CPU is larger than 8MB. For this, I am thinking to have a 8 bytes encoded format that includes RowID and SORT_COLUMNS (partial or full), for example, 2 bytes for RowId, remaining 6 bytes for 2 to 3 columns after dictionary encoding. a) If we can hold the RowID + whole SORT_COLUMNS in 8 bytes, it will be most efficient to leverage CPU cache to do sorting, use in-place update approach while sorting. So no extra storage is needed, and the RowID + whole SORT_COLUMNS will be sorted. b) If we can only hold the RowID + partial SORT_COLUMNS in 8 bytes, we can employ strategy like the sorting in Spark Tungsten project. (first compare the 8 bytes in cache, if it equals then compare remaining bytes in memory) Regards, Jacky > 在 2017年5月22日,上午10:19,David CaiQiang <[hidden email]> 写道: > > As I known, System.arrayCopy of object array is a shallow copy, so I think > both KeyPage and TablePage maybe have the same performance on Arrays.sort. > > > ----- > Best Regards > David Cai > -- > View this message in context: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/DISCUSS-Data-loading-improvement-tp11429p13056.html > Sent from the Apache CarbonData Dev Mailing List archive mailing list archive at Nabble.com. |
In reply to this post by ravipesala
Yes, Ravindra, unsafe sort will be better.
In my last mail, I mentioned a 8-bytes encoded format for RowID + SORT_COLUMNS, if SORT_COLUMNS are dictionary encoded, I think it is effectively like unsafe which is only type of byte[8], right? So we can do this by ourselves instead of depending on 3rd party library (sum.misc.Unsafe) ? Regards, Jacky > 在 2017年5月22日,上午11:54,Ravindra Pesala <[hidden email]> 写道: > > Hi, > > Using Object[] as a row while loading is not efficient in terms of memory > usage. It would be more efficient to keep them in unsafe as it can keep the > data in more compacted way as per data type. > > And regarding sorting it would be good to concentrate on single sorting > solution. Since we already have stable unsafe sort we should better > consider removing other normal sorting implementation to reduce the > maintaining cost. And concentrate more on improving the unsafe sort. > > As per the observation GC is reduced a lot in case of unsafe sort but still > some GC is observed, that is I think because of writer step as it still > holds the data for some time on heap for sorting and writing the data in > blocklet wise. Even this data also we can consider keep in unsafe to avoid > GC. > > > Regards, > Ravindra. > > On 21 May 2017 at 23:15, Jacky Li <[hidden email]> wrote: > >> Hi, >> >> While I am working on data load improvement and encoding override feature, >> I found that it is not efficient to use the CarbonRow with Object[]. I >> think a better way is to use fix length primitive type instead of Object. >> >> Since currently SORT_COLUMNS is merged, I think it is possible to: >> 1. separate key columns and non key columns >> 2. use exact data type for all columns instead of Object >> 3. no need to differentiate dimension and measure while loading, encoding >> is based on default strategy or user override >> >> For example: >> >> TablePage { >> ColumnPage[] keyPages; >> ColumnPage[] nonKeyPages; >> } >> >> ColumnPage has following implementations: >> 1. FixtLengthColumnPage // internal data type: byte, short, int, long, >> float, double, dictionary encoding column also should use this class >> 2. StringColumnPage // internal data type: byte[] >> 3. DecimalColumnPage // internal data type: byte[] >> 4. ComplexColumnPage // internal data type: byte[] >> >> If user specify only dictionary column in SORT_COLUMNS and cardinality is >> less, it is possible that we can further optimize the keyPage member as >> long data type (can hold up to 4 columns, each column cardinality less than >> 65535), so that it can be fit in CPU cache and it will be more efficient >> while sorting. >> >> Regards, >> Jacky >> >>> 在 2017年4月27日,上午9:20,Jacky Li <[hidden email]> 写道: >>> >>> Hi community, >>> >>> More idea on the loading process, in my opinion the ideal one should be >> as following, please comment: >>> >>> 1. input step: >>> - Do the parsing of input data, either CSV or Dataframe, they all >> convert into CarbonRow. >>> - Buffering them to CarbonRowBatch >>> - Prefetchiong of rows >>> >>> 2. convert step: >>> - Convert fields based on the category of columns >>> - for global dictionary encoded columns, convert them into dictionary >> id. >>> - for primitive type columns, just keep as origin type. (no conversion) >>> - for string and variable length data type columns (like complex >> columns), convert to LV (length-value) encoded byte array >>> >>> - Convert input row to columnar layout as we read from previous step. >> Create a ColumnPage for one column, and a DataPage for all columns. >>> >>> - The DataPage should store the key columns specified by SORT_COLUMNS >> together in one vector, so that it will be efficient for CPU cache while >> sorting >>> >>> 3. index-build step: >>> - Based on configuration, there can be index-build step or without this >> step if SORT_COLUMNS is empty, which indicating no sort. >>> >>> - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage >> on the fly. The sort result is array of rowId(pageId + offset). Avoid using >> Arrays.sort in JDK. >>> >>> - The sorter should support batch sort or merge sort. For merge sort, >> muliple spilling strategy can be implemented. >>> >>> 4. write step: >>> - Calculate the start/end key of MDKey over the ColumnPage >>> >>> - Calculate the statistics of the ColumnPage. Currently in carbondata >> min/max stats is hard coded. As a first step of extension, we want to >> provide more stats like histogram and bloom filter so that user can choose >> from. Optional field need to be added in thrift definition in DataChunk >> struct >>> >>> - Encode and compress each column into byte array. To make encoding more >> trasparent to the user and extensible, we should first decouple the data >> type with the available codec options. So in the code can choose codec for >> specific data type by some strategy. For the strategy, we can firstly have >> a default one as beginning, later on we can open this interface for >> developer to extension in the future. (say, developer can choose codec >> based on some hueristic rules based on the data distribution) >>> >>> - After each column is encoded and compressed, follow the current >> Producer-Consumer logic to write to disk. >>> >>> >>> As the bottlenet will be the merge sort, some thought on the merging >> strategy: >>> Merge sort strategy: >>> 1. spill both key columns and non-key columns in sorted order >>> 2. spill key columns in sorted order and non-key columns in original >> order, and spill rowId also. Apply rowId mapping only in merger stage. This >> can avoid some random memory access on non-key columns. It is good if there >> are many non-key columns >>> 3. do not spill key columns, and spill non-key columns in origin order. >> This is good as we can keep more key columns in memory >>> >>> >>> Regards, >>> Jacky >>> >>> >>>> 在 2017年4月22日,下午8:08,Liang Chen <[hidden email]> 写道: >>>> >>>> Jacky, thank you list these constructive improvements of data loading. >>>> >>>> Agree to consider all these improvement points, only the below one i >> have >>>> some concerns. >>>> Before considering open interfaces for data loading, we need to more >>>> clearly define block/blocklet/page which play what different roles, >> then we >>>> could consider some interfaces for block level, some interfaces for >>>> blocklet level, some for page level. >>>> >>>> Let us take up these improvements in the coming release. >>>> ------------------------------------------------------------ >> -------------------------------------------------------- >>>> improvement 3: we want to open up some interfaces for letting developer >> to >>>> add more page encoding, statistics, page compression. These interface >> will >>>> be like callbacks, so developer can write new encoding/statistics/ >> compression >>>> method and carbon loading process will invoke it in this step. This >>>> interface will be like: >>>> >>>> Regards >>>> Liang >>>> >>>> 2017-04-21 14:50 GMT+08:00 Jacky Li <[hidden email]>: >>>> >>>>> I want to propose following improvement on data loading process: >>>>> >>>>> Currently different steps are using different data layout in CarbonRow, >>>>> and it convert back and forth in different steps. It is not easy for >>>>> developer to understand the data structure used in each steps and it >>>>> increase the memory requirement as it is doing unnecessary data >> copying in >>>>> some steps. So, suggest to improve it as following >>>>> >>>>> 1) input step: read input and create a CarbonRow with all fields are >>>>> string type >>>>> >>>>> 2) convert step: convert string to byte[] according to its data type, >>>>> this step has compression effect of the input row so it is good for >> saving >>>>> memory and it also take cares of the null value >>>>> if it is dictionary dimension then convert to surrogate key; >>>>> if it is no dictionary then convert to byte[] representation; >>>>> if it is complex dimension, then convert to byte[] >> representation; >>>>> if it is measure then convert to Object, like Integer, Long, >>>>> Double, according to schema —> change to byte[] instead of storing >> Object >>>>> to avoid boxing/unboxing and save memory >>>>> >>>>> The conversion is happened by updating the field in CarbonRow in >>>>> place, there should be no new CarbonRow created, however, there is a >> copy >>>>> operation of the input CarbonRow, for bad record handling —> do not >> copy >>>>> the row, convert it back to value if it is bad record >>>>> >>>>> 3) sort step: >>>>> improvement 1: sort the collected input CarbonRow. Currently this >> is >>>>> done by copying the row object into internal buffer and sort is done >> on the >>>>> buffer. —> to avoid the copying of the CarbonRow, we should create this >>>>> buffer (with RowID) in input step, and only output the sorted RowID (by >>>>> swapping its value in the RowID array) according to its value. If it >> is a >>>>> merge sort, then write to file based on this sorted RowID array when >>>>> spilling to disk. So no copy of CarbonRow is required. >>>>> >>>>> improvement 2: when spilling to disk, currently it changes the field >>>>> order in CarbonRow, it is writing as a 3-elements array, [global >> dictionary >>>>> dimension, plain dimension and complex dimension, measure columns] , >> this >>>>> is because the merger is expecting it like this —> I think this is >>>>> unnecessary, we can add serialization/deserialization capability in >>>>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it >> also >>>>> avoid this conversion in write step. >>>>> >>>>> 4) write step: >>>>> currently it will collect one page of data (32K rows) and start a >>>>> Producer which actually is the encode process of one page. In order to >>>>> support parallel processing, after the page data is encoded then put >> it to >>>>> a queue which will be taken by the Consumer, the Consumer will collect >>>>> pages up to one blocklet size (configurable, say 64MB), and write to >>>>> CarbonData files. >>>>> >>>>> improvement 1: there is an unnecessary data copy and re-ordering of >>>>> the fields of the row. it converts the row to: [measure columns, plain >>>>> dimension and complex dimension, global dictionary dimension] it is >>>>> different from what sort step outputs. —> so suggest to use CarbonRow >> only. >>>>> no new row object should be created here. >>>>> >>>>> improvement 2: there are multiple traversal of the page data in the >>>>> code currently —> we should change to, firstly convert the CarbonRow to >>>>> ColumnarPage which is the columnar representation for all columns in >> one >>>>> page, and collect the start/end key and statistics when doing this >> columnar >>>>> conversion. Then apply inverted index, RLE, compression process based >> on >>>>> ColumnarPage object. >>>>> >>>>> improvement 3: we want to open up some interfaces for letting >>>>> developer to add more page encoding, statistics, page compression. >> These >>>>> interface will be like callbacks, so developer can write new >>>>> encoding/statistics/compression method and carbon loading process will >>>>> invoke it in this step. This interface will be like: >>>>> >>>>> /** >>>>> * Codec for a column page, implementation should not keep state across >>>>> pages, >>>>> * caller will use the same object to encode multiple pages >>>>> */ >>>>> interface PageCodec { >>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>>>> String getName(); >>>>> void startPage(int pageID); >>>>> void processColumn(ColumnBatch batch); >>>>> byte[] endPage(int pageID); >>>>> ColumnBatch decode(byte[] encoded); >>>>> } >>>>> >>>>> /** Compressor of the page data, the flow is encode->compress, and >>>>> decompress->decode */ >>>>> interface PageCompressor { >>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>>>> String getName(); >>>>> byte[] compress(byte[] encodedData); >>>>> byte[] decompress(byte[] data); >>>>> } >>>>> >>>>> /** Calculate the statistics for a column page and blocklet */ >>>>> interface Statistics { >>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>>>> String getName(); >>>>> void startPage(int pageID); >>>>> void endPage(int pageID); >>>>> void startBlocklet(int blockletID); >>>>> void endBlocklet(int blockletID); >>>>> >>>>> /** Update the stats for the input batch */ >>>>> void update(ColumnBatch batch); >>>>> >>>>> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) >> */ >>>>> int[] getPageStatistisc(); >>>>> >>>>> /** Output will be written to Footer */ >>>>> int[] getBlockletStatistics(); >>>>> } >>>>> >>>>> And, there should be a partition step adding somewhere to support >>>>> partition feature (CARBONDATA-910), and it depends on whether we >> implement >>>>> this partition shuffling in spark layer or carbon layer. (before input >> step >>>>> or after conversion step). What is the current idea of this? @CaiQiang >>>>> @Lionel >>>>> >>>>> What you think about these improvements? >>>>> >>>>> Regards, >>>>> Jacky >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards >>>> Liang >>>> >>> >>> >>> >> >> >> >> > > > -- > Thanks & Regards, > Ravi > |
In reply to this post by Jacky Li
Hi,
I think you are referring to tungsten sort, there they tried keep pointer and key together to simulate cache aware computation. It is only possible if the sort keys are always starts with fixed keys like dictionary keys. So basically first encountered few dictionary columns can be kept along with pointer and starts sorting, if that is equal then we can go and retrieve remaining key and compare it. It is simple to implement in our current design as our current implementation of unsafe sort is also inspired from tungsten sort. Regards, Ravindra. On 22 May 2017 at 09:31, Jacky Li <[hidden email]> wrote: > For sorting, I think more optimization we can do, I am currently thinking > these: > 1. Do not sort the whole TablePage, only KeyPage is required as the sort > key > > 2. Should find a more memory efficient sorting algorithm than > System.arraycopy which requires doubling space. > > 3. Should try to hold the KeyPage as well as the RowId in a compact data > structure, it is best if it fits in CPU cache. Modern L3 CPU is larger than > 8MB. For this, I am thinking to have a 8 bytes encoded format that includes > RowID and SORT_COLUMNS (partial or full), for example, 2 bytes for RowId, > remaining 6 bytes for 2 to 3 columns after dictionary encoding. > a) If we can hold the RowID + whole SORT_COLUMNS in 8 bytes, it will > be most efficient to leverage CPU cache to do sorting, use in-place update > approach while sorting. So no extra storage is needed, and the RowID + > whole SORT_COLUMNS will be sorted. > b) If we can only hold the RowID + partial SORT_COLUMNS in 8 bytes, > we can employ strategy like the sorting in Spark Tungsten project. (first > compare the 8 bytes in cache, if it equals then compare remaining bytes in > memory) > > Regards, > Jacky > > > 在 2017年5月22日,上午10:19,David CaiQiang <[hidden email]> 写道: > > > > As I known, System.arrayCopy of object array is a shallow copy, so I > think > > both KeyPage and TablePage maybe have the same performance on > Arrays.sort. > > > > > > ----- > > Best Regards > > David Cai > > -- > > View this message in context: http://apache-carbondata-dev-m > ailing-list-archive.1130556.n5.nabble.com/DISCUSS-Data-loadi > ng-improvement-tp11429p13056.html > > Sent from the Apache CarbonData Dev Mailing List archive mailing list > archive at Nabble.com. > > -- Thanks & Regards, Ravi |
Yes, and if after dictionary encoding, SORT_COLUMNS can fit in 6 bytes, our approach can be even better, because the 8 bytes data can be put in cache totally, without the remaining portion in memory.
Regards, Jacky > 在 2017年5月22日,下午5:23,Ravindra Pesala <[hidden email]> 写道: > > Hi, > > I think you are referring to tungsten sort, there they tried keep pointer > and key together to simulate cache aware computation. It is only possible > if the sort keys are always starts with fixed keys like dictionary keys. So > basically first encountered few dictionary columns can be kept along with > pointer and starts sorting, if that is equal then we can go and retrieve > remaining key and compare it. > It is simple to implement in our current design as our current > implementation of unsafe sort is also inspired from tungsten sort. > > Regards, > Ravindra. > > On 22 May 2017 at 09:31, Jacky Li <[hidden email]> wrote: > >> For sorting, I think more optimization we can do, I am currently thinking >> these: >> 1. Do not sort the whole TablePage, only KeyPage is required as the sort >> key >> >> 2. Should find a more memory efficient sorting algorithm than >> System.arraycopy which requires doubling space. >> >> 3. Should try to hold the KeyPage as well as the RowId in a compact data >> structure, it is best if it fits in CPU cache. Modern L3 CPU is larger than >> 8MB. For this, I am thinking to have a 8 bytes encoded format that includes >> RowID and SORT_COLUMNS (partial or full), for example, 2 bytes for RowId, >> remaining 6 bytes for 2 to 3 columns after dictionary encoding. >> a) If we can hold the RowID + whole SORT_COLUMNS in 8 bytes, it will >> be most efficient to leverage CPU cache to do sorting, use in-place update >> approach while sorting. So no extra storage is needed, and the RowID + >> whole SORT_COLUMNS will be sorted. >> b) If we can only hold the RowID + partial SORT_COLUMNS in 8 bytes, >> we can employ strategy like the sorting in Spark Tungsten project. (first >> compare the 8 bytes in cache, if it equals then compare remaining bytes in >> memory) >> >> Regards, >> Jacky >> >>> 在 2017年5月22日,上午10:19,David CaiQiang <[hidden email]> 写道: >>> >>> As I known, System.arrayCopy of object array is a shallow copy, so I >> think >>> both KeyPage and TablePage maybe have the same performance on >> Arrays.sort. >>> >>> >>> ----- >>> Best Regards >>> David Cai >>> -- >>> View this message in context: http://apache-carbondata-dev-m >> ailing-list-archive.1130556.n5.nabble.com/DISCUSS-Data-loadi >> ng-improvement-tp11429p13056.html >>> Sent from the Apache CarbonData Dev Mailing List archive mailing list >> archive at Nabble.com. >> >> > > > -- > Thanks & Regards, > Ravi |
Hi All,
Do we have some statistics about current bottlenecks which part is taking more time?? *@Ravindra* Please correct me If I am wrong I think our current unsafe sort is also in-place it is only swapping the offsets not data. Only from comparison it is getting the data from off-heap to on-heap. Our current *GC* bottleneck is in writing step, there we are keeping the data for some time. There also we can store the data in off-heap and we can go for unsafe sort while sorting the column level data. It's better to get the current bottlenecks and proceed further. -Regards Kumar Vishal On Mon, May 22, 2017 at 4:37 PM, Jacky Li <[hidden email]> wrote: > Yes, and if after dictionary encoding, SORT_COLUMNS can fit in 6 bytes, > our approach can be even better, because the 8 bytes data can be put in > cache totally, without the remaining portion in memory. > > Regards, > Jacky > > > 在 2017年5月22日,下午5:23,Ravindra Pesala <[hidden email]> 写道: > > > > Hi, > > > > I think you are referring to tungsten sort, there they tried keep pointer > > and key together to simulate cache aware computation. It is only possible > > if the sort keys are always starts with fixed keys like dictionary keys. > So > > basically first encountered few dictionary columns can be kept along with > > pointer and starts sorting, if that is equal then we can go and retrieve > > remaining key and compare it. > > It is simple to implement in our current design as our current > > implementation of unsafe sort is also inspired from tungsten sort. > > > > Regards, > > Ravindra. > > > > On 22 May 2017 at 09:31, Jacky Li <[hidden email]> wrote: > > > >> For sorting, I think more optimization we can do, I am currently > thinking > >> these: > >> 1. Do not sort the whole TablePage, only KeyPage is required as the sort > >> key > >> > >> 2. Should find a more memory efficient sorting algorithm than > >> System.arraycopy which requires doubling space. > >> > >> 3. Should try to hold the KeyPage as well as the RowId in a compact data > >> structure, it is best if it fits in CPU cache. Modern L3 CPU is larger > than > >> 8MB. For this, I am thinking to have a 8 bytes encoded format that > includes > >> RowID and SORT_COLUMNS (partial or full), for example, 2 bytes for > RowId, > >> remaining 6 bytes for 2 to 3 columns after dictionary encoding. > >> a) If we can hold the RowID + whole SORT_COLUMNS in 8 bytes, it will > >> be most efficient to leverage CPU cache to do sorting, use in-place > update > >> approach while sorting. So no extra storage is needed, and the RowID + > >> whole SORT_COLUMNS will be sorted. > >> b) If we can only hold the RowID + partial SORT_COLUMNS in 8 bytes, > >> we can employ strategy like the sorting in Spark Tungsten project. > (first > >> compare the 8 bytes in cache, if it equals then compare remaining > bytes in > >> memory) > >> > >> Regards, > >> Jacky > >> > >>> 在 2017年5月22日,上午10:19,David CaiQiang <[hidden email]> 写道: > >>> > >>> As I known, System.arrayCopy of object array is a shallow copy, so I > >> think > >>> both KeyPage and TablePage maybe have the same performance on > >> Arrays.sort. > >>> > >>> > >>> ----- > >>> Best Regards > >>> David Cai > >>> -- > >>> View this message in context: http://apache-carbondata-dev-m > >> ailing-list-archive.1130556.n5.nabble.com/DISCUSS-Data-loadi > >> ng-improvement-tp11429p13056.html > >>> Sent from the Apache CarbonData Dev Mailing List archive mailing list > >> archive at Nabble.com. > >> > >> > > > > > > -- > > Thanks & Regards, > > Ravi > > > >
kumar vishal
|
Free forum by Nabble | Edit this page |