Discussion regrading design of data load after kettle removal.

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Discussion regrading design of data load after kettle removal.

ravipesala
Hi All,


Removing kettle from carbondata is necessary as this legacy kettle framework become overhead to carbondata.This discussion is regarding the design of carbon load with out kettle.

The main interface for data loading here is DataLoadProcessorStep.

/**
 * This base interface for data loading. It can do transformation jobs as per the implementation.
 *
 */
public interface DataLoadProcessorStep {

  /**
   * The output meta for this step. The data returns from this step is as per this meta.
   * @return
   */
  DataField[] getOutput();

  /**
   * Intialization process for this step.
   * @param configuration
   * @param child
   * @throws CarbonDataLoadingException
   */
  void intialize(CarbonDataLoadConfiguration configuration, DataLoadProcessorStep child) throws
      CarbonDataLoadingException;

  /**
   * Tranform the data as per the implemetation.
   * @return Iterator of data
   * @throws CarbonDataLoadingException
   */
  Iterator<Object[]> execute() throws CarbonDataLoadingException;

  /**
   * Any closing of resources after step execution can be done here.
   */
  void finish();
}

The implementation classes for DataLoadProcessorStep are InputProcessorStep, EncoderProcessorStep, SortProcessorStep and DataWriterProcessorStep.

The following picture depicts the loading process with implementation classes.



InputProcessorStep :  It does two jobs, 1. It reads data from RecordReader of InputFormat 2. Parse each field of column as per the data type.
EncoderProcessorStep: It encodes each field with dictionary if requires.And combine all no dictionary columns to single byte array.
SortProcessorStep :   It sorts the data on dimension columns and write to intermediate files.
DataWriterProcessorStep : It merge sort the data from intermediate temp files and generate mdk key and writes the data in carbondata format to store. 



The following interface for Dictionary generation. 

/**
 * Generates dictionary for the column. The implementation classes can be pre-defined or
 * local or global dictionary generations.
 */
public interface ColumnDictionaryGenerator {

  /**
   * Generates dictionary value for the column data
   * @param data
   * @return dictionary value
   */
  int generateDictionaryValue(Object data);

  /**
   * Returns the actual value associated with dictionary value.
   * @param dictionary
   * @return actual value.
   */
  Object getValueFromDictionary(int dictionary);

  /**
   * Returns the maximum value among the dictionary values. It is used for generating mdk key.
   * @return max dictionary value.
   */
  int getMaxDictionaryValue();

}

This ColumnDictionaryGenerator interface can have 3 implementations, 1. PreGeneratedColumnDictionaryGenerator 2. GlobalColumnDictionaryGenerator 3. LocalColumnDictionaryGenerator



PreGeneratedColumnDictionaryGenerator : It gets the dictionary values from already generated and loaded dictionary.
GlobalColumnDictionaryGenerator : It generates global dictionary online by using KV store or distributed map.
LocalColumnDictionaryGenerator : It generates local dictionary only for that executor.


For more information on the loading please check the PR https://github.com/apache/incubator-carbondata/pull/215

Please let me know any changes are required in these interfaces.

--
Thanks & Regards,
Ravi
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

kumarvishal09
Hi Ravi,
We can move mdkey generation step before sorting, this will compress the
dictionary data and will reduce the IO.
-Regards
Kumar Vishal

On Sat, Oct 8, 2016 at 3:30 PM, Ravindra Pesala <[hidden email]>
wrote:

> Hi All,
>
>
> Removing kettle from carbondata is necessary as this legacy kettle
> framework become overhead to carbondata.This discussion is regarding the
> design of carbon load with out kettle.
>
> The main interface for data loading here is DataLoadProcessorStep.
>
> */***
> * * This base interface for data loading. It can do transformation jobs as
> per the implementation.*
> * **
> * */*
> *public interface DataLoadProcessorStep {*
>
> *  /***
> *   * The output meta for this step. The data returns from this step is as
> per this meta.*
> *   * @return*
> *   */*
> *  DataField[] getOutput();*
>
> *  /***
> *   * Intialization process for this step.*
> *   * @param configuration*
> *   * @param child*
> *   * @throws CarbonDataLoadingException*
> *   */*
> *  void intialize(CarbonDataLoadConfiguration configuration,
> DataLoadProcessorStep child) throws*
> *      CarbonDataLoadingException;*
>
> *  /***
> *   * Tranform the data as per the implemetation.*
> *   * @return Iterator of data*
> *   * @throws CarbonDataLoadingException*
> *   */*
> *  Iterator<Object[]> execute() throws CarbonDataLoadingException;*
>
> *  /***
> *   * Any closing of resources after step execution can be done here.*
> *   */*
> *  void finish();*
> *}*
>
> The implementation classes for DataLoadProcessorStep are
> InputProcessorStep, EncoderProcessorStep, SortProcessorStep and
> DataWriterProcessorStep.
>
> The following picture depicts the loading process with implementation
> classes.
>
> [image: Inline images 2]
>
> *InputProcessorStep* :  It does two jobs, 1. It reads data from
> RecordReader of InputFormat 2. Parse each field of column as per the data
> type.
> *EncoderProcessorStep*: It encodes each field with dictionary if
> requires.And combine all no dictionary columns to single byte array.
> *SortProcessorStep* :   It sorts the data on dimension columns and write
> to intermediate files.
> *DataWriterProcessorStep* : It merge sort the data from intermediate temp
> files and generate mdk key and writes the data in carbondata format to
> store.
>
>
>
> The following interface for Dictionary generation.
>
> */***
> * * Generates dictionary for the column. The implementation classes can be
> pre-defined or*
> * * local or global dictionary generations.*
> * */*
> *public interface ColumnDictionaryGenerator {*
>
> *  /***
> *   * Generates dictionary value for the column data*
> *   * @param data*
> *   * @return dictionary value*
> *   */*
> *  int generateDictionaryValue(Object data);*
>
> *  /***
> *   * Returns the actual value associated with dictionary value.*
> *   * @param dictionary*
> *   * @return actual value.*
> *   */*
> *  Object getValueFromDictionary(int dictionary);*
>
> *  /***
> *   * Returns the maximum value among the dictionary values. It is used
> for generating mdk key.*
> *   * @return max dictionary value.*
> *   */*
> *  int getMaxDictionaryValue();*
>
> *}*
>
> This ColumnDictionaryGenerator interface can have 3 implementations, 1.
> PreGeneratedColumnDictionaryGenerator 2. GlobalColumnDictionaryGenerator
> 3. LocalColumnDictionaryGenerator
>
> [image: Inline images 3]
>
> *PreGeneratedColumnDictionaryGenerator* : It gets the dictionary values
> from already generated and loaded dictionary.
> *GlobalColumnDictionaryGenerator* : It generates global dictionary online
> by using KV store or distributed map.
> *LocalColumnDictionaryGenerator* : It generates local dictionary only for
> that executor.
>
>
> For more information on the loading please check the PR
> https://github.com/apache/incubator-carbondata/pull/215
>
> Please let me know any changes are required in these interfaces.
>
> --
> Thanks & Regards,
> Ravi
>
kumar vishal
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

ravipesala
Hi Vishal,

You are right, but it is possible only if the dictionary is already
generated and cardinality of each column is already known, so that is
possible in present solution.  But If we want single pass dataloading
solution then we require to generate global dictionary online(by using KV
store or distributed map), in this case generating mdk key before sort step
is not possible.

Regards,
Ravi

On 8 October 2016 at 21:02, Kumar Vishal <[hidden email]> wrote:

> Hi Ravi,
> We can move mdkey generation step before sorting, this will compress the
> dictionary data and will reduce the IO.
> -Regards
> Kumar Vishal
>
> On Sat, Oct 8, 2016 at 3:30 PM, Ravindra Pesala <[hidden email]>
> wrote:
>
> > Hi All,
> >
> >
> > Removing kettle from carbondata is necessary as this legacy kettle
> > framework become overhead to carbondata.This discussion is regarding the
> > design of carbon load with out kettle.
> >
> > The main interface for data loading here is DataLoadProcessorStep.
> >
> > */***
> > * * This base interface for data loading. It can do transformation jobs
> as
> > per the implementation.*
> > * **
> > * */*
> > *public interface DataLoadProcessorStep {*
> >
> > *  /***
> > *   * The output meta for this step. The data returns from this step is
> as
> > per this meta.*
> > *   * @return*
> > *   */*
> > *  DataField[] getOutput();*
> >
> > *  /***
> > *   * Intialization process for this step.*
> > *   * @param configuration*
> > *   * @param child*
> > *   * @throws CarbonDataLoadingException*
> > *   */*
> > *  void intialize(CarbonDataLoadConfiguration configuration,
> > DataLoadProcessorStep child) throws*
> > *      CarbonDataLoadingException;*
> >
> > *  /***
> > *   * Tranform the data as per the implemetation.*
> > *   * @return Iterator of data*
> > *   * @throws CarbonDataLoadingException*
> > *   */*
> > *  Iterator<Object[]> execute() throws CarbonDataLoadingException;*
> >
> > *  /***
> > *   * Any closing of resources after step execution can be done here.*
> > *   */*
> > *  void finish();*
> > *}*
> >
> > The implementation classes for DataLoadProcessorStep are
> > InputProcessorStep, EncoderProcessorStep, SortProcessorStep and
> > DataWriterProcessorStep.
> >
> > The following picture depicts the loading process with implementation
> > classes.
> >
> > [image: Inline images 2]
> >
> > *InputProcessorStep* :  It does two jobs, 1. It reads data from
> > RecordReader of InputFormat 2. Parse each field of column as per the data
> > type.
> > *EncoderProcessorStep*: It encodes each field with dictionary if
> > requires.And combine all no dictionary columns to single byte array.
> > *SortProcessorStep* :   It sorts the data on dimension columns and write
> > to intermediate files.
> > *DataWriterProcessorStep* : It merge sort the data from intermediate temp
> > files and generate mdk key and writes the data in carbondata format to
> > store.
> >
> >
> >
> > The following interface for Dictionary generation.
> >
> > */***
> > * * Generates dictionary for the column. The implementation classes can
> be
> > pre-defined or*
> > * * local or global dictionary generations.*
> > * */*
> > *public interface ColumnDictionaryGenerator {*
> >
> > *  /***
> > *   * Generates dictionary value for the column data*
> > *   * @param data*
> > *   * @return dictionary value*
> > *   */*
> > *  int generateDictionaryValue(Object data);*
> >
> > *  /***
> > *   * Returns the actual value associated with dictionary value.*
> > *   * @param dictionary*
> > *   * @return actual value.*
> > *   */*
> > *  Object getValueFromDictionary(int dictionary);*
> >
> > *  /***
> > *   * Returns the maximum value among the dictionary values. It is used
> > for generating mdk key.*
> > *   * @return max dictionary value.*
> > *   */*
> > *  int getMaxDictionaryValue();*
> >
> > *}*
> >
> > This ColumnDictionaryGenerator interface can have 3 implementations, 1.
> > PreGeneratedColumnDictionaryGenerator 2. GlobalColumnDictionaryGenerator
> > 3. LocalColumnDictionaryGenerator
> >
> > [image: Inline images 3]
> >
> > *PreGeneratedColumnDictionaryGenerator* : It gets the dictionary values
> > from already generated and loaded dictionary.
> > *GlobalColumnDictionaryGenerator* : It generates global dictionary
> online
> > by using KV store or distributed map.
> > *LocalColumnDictionaryGenerator* : It generates local dictionary only
> for
> > that executor.
> >
> >
> > For more information on the loading please check the PR
> > https://github.com/apache/incubator-carbondata/pull/215
> >
> > Please let me know any changes are required in these interfaces.
> >
> > --
> > Thanks & Regards,
> > Ravi
> >
>



--
Thanks & Regards,
Ravi
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

Jacky Li
In reply to this post by ravipesala
Hi Ravindra,

It seems the picture is missing, can you post it in a URL and share the link?

Regards,
Jacky
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

Jacky Li
In reply to this post by ravipesala
Hi Ravindra,

I have following questions:

1. How does DataLoadProcessorStep inteface work? For each step, it will call its child step to execute and apply its logic to the returned iterator of the child? And how does it map to OutputFormat in hadoop interface?

2. This step interface relies on iterator to do the encoding row by row, will it be convinient to add batch encoder support now or later?

3. for the ditionary part, besides generator I think it is better also considering the interface for the reading of dictionary while querying. Are you planning to use the same interface? If so, it is not just a Generator.
If the dictionary interface is well designed, other developer can also add new dictionary type. For example:
- based on usage frequency to assign dictionary value, for better compression, similar to huffman encoding
- order-preserving dictionary which can do range filter on dictionary value directly

Regards,
Jacky
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

ravipesala
Hi Jacky,

https://drive.google.com/open?id=0B4TWTVbFSTnqeElyWko5NDlBZkdxS3NrMW1PZndzMG5ZM2Y0


1. Yes it calls child step to execute and apply its logic to return
iterator just like spark sql.  For CarbonOutputFormat  it will use
RecordBufferedWriterIterator and collects the data in batches.
https://drive.google.com/open?id=0B4TWTVbFSTnqTF85anlDOUQ5S1BqYzFpLWcwZnBLSVVqSWpj

2. Yes,this interface relies on processing row by row. But we can also
execute in batches in iterator.

3.Yes, dictionary interface is used for reading dictionary while querying.
Ok based on my understanding I have added this interface, we can discuss
more on it and update the interface.


Regards,
Ravi

On 10 October 2016 at 14:56, Jacky Li <[hidden email]> wrote:

> Hi Ravindra,
>
> I have following questions:
>
> 1. How does DataLoadProcessorStep inteface work? For each step, it will
> call
> its child step to execute and apply its logic to the returned iterator of
> the child? And how does it map to OutputFormat in hadoop interface?
>
> 2. This step interface relies on iterator to do the encoding row by row,
> will it be convinient to add batch encoder support now or later?
>
> 3. for the ditionary part, besides generator I think it is better also
> considering the interface for the reading of dictionary while querying. Are
> you planning to use the same interface? If so, it is not just a Generator.
> If the dictionary interface is well designed, other developer can also add
> new dictionary type. For example:
> - based on usage frequency to assign dictionary value, for better
> compression, similar to huffman encoding
> - order-preserving dictionary which can do range filter on dictionary value
> directly
>
> Regards,
> Jacky
>
>
>
> --
> View this message in context: http://apache-carbondata-
> mailing-list-archive.1130556.n5.nabble.com/Discussion-
> regrading-design-of-data-load-after-kettle-removal-tp1672p1726.html
> Sent from the Apache CarbonData Mailing List archive mailing list archive
> at Nabble.com.
>



--
Thanks & Regards,
Ravi
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

Jacky Li
Hi Ravindra,

Regarding the design (https://drive.google.com/file/d/0B4TWTVbFSTnqTF85anlDOUQ5S1BqYzFpLWcwZnBLSVVqSWpj/view), I have following question:

1. In SortProcessorStep, I think it is better to include MergeSort in this step also, so it includes all logic for sorting. In this case, developer can implement a external sort (spill to files only if necessary), then the loading process is a on-line sorting if memory is sufficient. I think it will improve loading performance a lot.

2. In EncoderProcessorStep, apart from the dictionary encoding, what other processing it will do? How about delta, RLE, etc.

3. In InputProcessorStep, it needs some schema definition to parse the input and convert to the row, right? For example, how to read from JSON, AVRO file?

Regards,
Jacky
Reply | Threaded
Open this post in threaded view
|

Re: Discussion regrading design of data load after kettle removal.

ravipesala
Hi Jacky,

1. Yes. It is better to keep all sorting logic to one step so other types
of sorts can be implemented easily. I will update the design.

2. EncoderProcessorStep can do dictionary encoding and converting
nodictionary and complex types to byte[] representation.
    Here encoding interface is flexible for user to give different encoding
representation at row level only.
    And about RLE, DELTA and also heavy compression are done at
DataWriterProcessorStep only, it is because these
 encodings/compression happens at bloclklet level not row level.

3. Yes, each step requires schema definition, that will be passed as
DataField[] through configuration to initial step InputProcessorStep .
Remaining steps can call child.getOutput() to get the schema. Here
each DataField
represents one column.

Regards,
Ravi

On 12 October 2016 at 09:38, Jacky Li <[hidden email]> wrote:

> Hi Ravindra,
>
> Regarding the design
> (https://drive.google.com/file/d/0B4TWTVbFSTnqTF85anlDOUQ5S1BqY
> zFpLWcwZnBLSVVqSWpj/view),
> I have following question:
>
> 1. In SortProcessorStep, I think it is better to include MergeSort in this
> step also, so it includes all logic for sorting. In this case, developer
> can
> implement a external sort (spill to files only if necessary), then the
> loading process is a on-line sorting if memory is sufficient. I think it
> will improve loading performance a lot.
>
> 2. In EncoderProcessorStep, apart from the dictionary encoding, what other
> processing it will do? How about delta, RLE, etc.
>
> 3. In InputProcessorStep, it needs some schema definition to parse the
> input
> and convert to the row, right? For example, how to read from JSON, AVRO
> file?
>
> Regards,
> Jacky
>
>
>
> --
> View this message in context: http://apache-carbondata-
> mailing-list-archive.1130556.n5.nabble.com/Discussion-
> regrading-design-of-data-load-after-kettle-removal-tp1672p1783.html
> Sent from the Apache CarbonData Mailing List archive mailing list archive
> at Nabble.com.
>



--
Thanks & Regards,
Ravi