[DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Aniket Adnaik
Hi All,

I would like to open up a discussion for new feature to support streaming
ingestion in CarbonData.

Please refer to design document(draft) in the link below.
      https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
/view?usp=sharing

Your comments/suggestions are welcome.
Here are some high level points.

Rationale:
The current ways of adding user data to CarbonData table is via LOAD
statement or using SELECT query with INSERT INTO statement. These methods
add bulk of data into CarbonData table into a new segment. Basically, it is
a batch insertion for a bulk of data. However, with increasing demand of
real time data analytics with streaming frameworks, CarbonData needs a way
to insert streaming data continuously into CarbonData table. CarbonData
needs a support for continuous and faster ingestion into CarbonData table
and make it available for querying.

CarbonData can leverage from our newly introduced V3 format to append
streaming data to existing carbon table.


Requirements:

Following are some high level requirements;
1.  CarbonData shall create a new segment (Streaming Segment) for each
streaming session. Concurrent streaming ingestion into same table will
create separate streaming segments.

2.  CarbonData shall use write optimized format (instead of multi-layered
indexed columnar format) to support ingestion of streaming data into a
CarbonData table.

3.  CarbonData shall create streaming segment folder and open a streaming
data file in append mode to write data. CarbonData should avoid creating
multiple small files by appending to an existing file.

4.  The data stored in new streaming segment shall be available for query
after it is written to the disk (hflush/hsync). In other words, CarbonData
Readers should be able to query the data in streaming segment written so
far.

5.  CarbonData should acknowledge the write operation status back to output
sink/upper layer streaming engine so that in the case of write failure,
streaming engine should restart the operation and maintain exactly once
delivery semantics.

6.  CarbonData Compaction process shall support compacting data from
write-optimized streaming segment to regular read optimized columnar
CarbonData format.

7.  CarbonData readers should maintain the read consistency by means of
using timestamp.

8.  Maintain durability - in case of write failure, CarbonData should be
able recover to latest commit status. This may require maintaining source
and destination offsets of last commits in a metadata.

This feature can be done in phases;

Phase -1 : Add basic framework and writer support to allow Spark Structured
streaming into CarbonData . This phase may or may not have append support.
Add reader support to read streaming data files.

Phase-2 : Add append support if not done in phase 1. Maintain append
offsets and metadata information.

Phase -3 : Add support for external streaming frameworks such as Kafka
streaming using spark structured steaming, maintain
topics/partitions/offsets and support fault tolerance .

Phase-4 : Add support to other streaming frameworks , such as flink , beam
etc.

Phase-5: Future support for in-memory cache for buffering streaming data,
support for union with Spark Structured streaming to serve directly from
spark structured streaming.  And add support for Time series data.

Best Regards,
Aniket
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Jacky Li
Hi Aniket,

This feature looks great, the overall plan also seems fine to me. Thanks for proposing it.
And I have some doubts inline.

> 在 2017年3月27日,下午6:34,Aniket Adnaik <[hidden email]> 写道:
>
> Hi All,
>
> I would like to open up a discussion for new feature to support streaming
> ingestion in CarbonData.
>
> Please refer to design document(draft) in the link below.
>      https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
> /view?usp=sharing
>
> Your comments/suggestions are welcome.
> Here are some high level points.
>
> Rationale:
> The current ways of adding user data to CarbonData table is via LOAD
> statement or using SELECT query with INSERT INTO statement. These methods
> add bulk of data into CarbonData table into a new segment. Basically, it is
> a batch insertion for a bulk of data. However, with increasing demand of
> real time data analytics with streaming frameworks, CarbonData needs a way
> to insert streaming data continuously into CarbonData table. CarbonData
> needs a support for continuous and faster ingestion into CarbonData table
> and make it available for querying.
>
> CarbonData can leverage from our newly introduced V3 format to append
> streaming data to existing carbon table.
>
>
> Requirements:
>
> Following are some high level requirements;
> 1.  CarbonData shall create a new segment (Streaming Segment) for each
> streaming session. Concurrent streaming ingestion into same table will
> create separate streaming segments.
>
> 2.  CarbonData shall use write optimized format (instead of multi-layered
> indexed columnar format) to support ingestion of streaming data into a
> CarbonData table.
>
> 3.  CarbonData shall create streaming segment folder and open a streaming
> data file in append mode to write data. CarbonData should avoid creating
> multiple small files by appending to an existing file.
>
> 4.  The data stored in new streaming segment shall be available for query
> after it is written to the disk (hflush/hsync). In other words, CarbonData
> Readers should be able to query the data in streaming segment written so
> far.
>
> 5.  CarbonData should acknowledge the write operation status back to output
> sink/upper layer streaming engine so that in the case of write failure,
> streaming engine should restart the operation and maintain exactly once
> delivery semantics.
>
> 6.  CarbonData Compaction process shall support compacting data from
> write-optimized streaming segment to regular read optimized columnar
> CarbonData format.
>
> 7.  CarbonData readers should maintain the read consistency by means of
> using timestamp.
>
> 8.  Maintain durability - in case of write failure, CarbonData should be
> able recover to latest commit status. This may require maintaining source
> and destination offsets of last commits in a metadata.
>
> This feature can be done in phases;
>
> Phase -1 : Add basic framework and writer support to allow Spark Structured
> streaming into CarbonData . This phase may or may not have append support.
> Add reader support to read streaming data files.
>
1. In this phase, is it still using columnar format? Save to a file for every mini batch? If so, it is only readable after the file has been closed and some metadata need to be kept to indicate the availability of the new file.
2. How to map the partition concept in spark to files in streaming segment? I guess some small file will be created, right?

> Phase-2 : Add append support if not done in phase 1. Maintain append
> offsets and metadata information.
>
Is the streaming data file format implemented in this phase?

> Phase -3 : Add support for external streaming frameworks such as Kafka
> streaming using spark structured steaming, maintain
> topics/partitions/offsets and support fault tolerance .
>
> Phase-4 : Add support to other streaming frameworks , such as flink , beam
> etc.
>
> Phase-5: Future support for in-memory cache for buffering streaming data,
> support for union with Spark Structured streaming to serve directly from
> spark structured streaming.  And add support for Time series data.
>
> Best Regards,
> Aniket
>



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Aniket Adnaik
Hi Jacky,

Please see my comments below;
1. In this phase, is it still using columnar format? Save to a file for
every mini batch? If so, it is only readable after the file has been closed
and some metadata need to be kept to indicate the availability of the new
file.

AA >> yes, for initial phase it will use default columnar format and save
to file every mini batch. Closing of file may not be needed, as HDFS allows
single writer-multiple readers. But yes it will require us to maintain a
streaming_status file to let readers know about valid timestamp and offsets
during getsplits.

2. How to map the partition concept in spark to files in streaming segment?
I guess some small file will be created, right?

AA>> In streaming context, writeStream.partitionBy() may require CarbonData
to create separate folder for each partition.
Folder may look like  \TableName\_Fact\part0\StreamingSegment\
*partition_0\streamingfile.001*
However, I am not sure how carbondata will utilize this partition info as
my assumption is currently CarbonData does not support partitioning.Also, I
am not sure if existing table with no partitioning schema can work well.
This needs further analysis.

3. Phase-2 : Add append support if not done in phase 1. Maintain append offsets
and metadata information.
Is the streaming data file format implemented in this phase?
AA>>  I think we can directly leverage from existing V3 format without much
changes in basic writer/reader framework, in that case implementing
streaming file format is a possibility.

Best Regards,
Aniket

On Tue, Mar 28, 2017 at 8:22 AM, Jacky Li <[hidden email]> wrote:

> Hi Aniket,
>
> This feature looks great, the overall plan also seems fine to me. Thanks
> for proposing it.
> And I have some doubts inline.
>
> > 在 2017年3月27日,下午6:34,Aniket Adnaik <[hidden email]> 写道:
> >
> > Hi All,
> >
> > I would like to open up a discussion for new feature to support streaming
> > ingestion in CarbonData.
> >
> > Please refer to design document(draft) in the link below.
> >      https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
> > /view?usp=sharing
> >
> > Your comments/suggestions are welcome.
> > Here are some high level points.
> >
> > Rationale:
> > The current ways of adding user data to CarbonData table is via LOAD
> > statement or using SELECT query with INSERT INTO statement. These methods
> > add bulk of data into CarbonData table into a new segment. Basically, it
> is
> > a batch insertion for a bulk of data. However, with increasing demand of
> > real time data analytics with streaming frameworks, CarbonData needs a
> way
> > to insert streaming data continuously into CarbonData table. CarbonData
> > needs a support for continuous and faster ingestion into CarbonData table
> > and make it available for querying.
> >
> > CarbonData can leverage from our newly introduced V3 format to append
> > streaming data to existing carbon table.
> >
> >
> > Requirements:
> >
> > Following are some high level requirements;
> > 1.  CarbonData shall create a new segment (Streaming Segment) for each
> > streaming session. Concurrent streaming ingestion into same table will
> > create separate streaming segments.
> >
> > 2.  CarbonData shall use write optimized format (instead of multi-layered
> > indexed columnar format) to support ingestion of streaming data into a
> > CarbonData table.
> >
> > 3.  CarbonData shall create streaming segment folder and open a streaming
> > data file in append mode to write data. CarbonData should avoid creating
> > multiple small files by appending to an existing file.
> >
> > 4.  The data stored in new streaming segment shall be available for query
> > after it is written to the disk (hflush/hsync). In other words,
> CarbonData
> > Readers should be able to query the data in streaming segment written so
> > far.
> >
> > 5.  CarbonData should acknowledge the write operation status back to
> output
> > sink/upper layer streaming engine so that in the case of write failure,
> > streaming engine should restart the operation and maintain exactly once
> > delivery semantics.
> >
> > 6.  CarbonData Compaction process shall support compacting data from
> > write-optimized streaming segment to regular read optimized columnar
> > CarbonData format.
> >
> > 7.  CarbonData readers should maintain the read consistency by means of
> > using timestamp.
> >
> > 8.  Maintain durability - in case of write failure, CarbonData should be
> > able recover to latest commit status. This may require maintaining source
> > and destination offsets of last commits in a metadata.
> >
> > This feature can be done in phases;
> >
> > Phase -1 : Add basic framework and writer support to allow Spark
> Structured
> > streaming into CarbonData . This phase may or may not have append
> support.
> > Add reader support to read streaming data files.
> >
> 1. In this phase, is it still using columnar format? Save to a file for
> every mini batch? If so, it is only readable after the file has been closed
> and some metadata need to be kept to indicate the availability of the new
> file.
> 2. How to map the partition concept in spark to files in streaming
> segment? I guess some small file will be created, right?
>
> > Phase-2 : Add append support if not done in phase 1. Maintain append
> > offsets and metadata information.
> >
> Is the streaming data file format implemented in this phase?
>
> > Phase -3 : Add support for external streaming frameworks such as Kafka
> > streaming using spark structured steaming, maintain
> > topics/partitions/offsets and support fault tolerance .
> >
> > Phase-4 : Add support to other streaming frameworks , such as flink ,
> beam
> > etc.
> >
> > Phase-5: Future support for in-memory cache for buffering streaming data,
> > support for union with Spark Structured streaming to serve directly from
> > spark structured streaming.  And add support for Time series data.
> >
> > Best Regards,
> > Aniket
> >
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Liang Chen
Administrator
In reply to this post by Aniket Adnaik
Hi Aniket

Thanks for your great contribution, The feature of ingestion streaming data to carbondata would be very useful for some real-time query scenarios.

Some inputs from my side:

1. I agree with approach 2 for streaming file format, the performance for query must be ensured.
2. Whether support compaction for streaming ingested data to add index, or not ?
--------------------------------------------------------------------------------------------
CarbonData shall use write optimized format (instead of multi-layered
indexed columnar format) to support ingestion of streaming data into a
CarbonData table.

3. For first version of streaming ingestion feature, will support which kind of streaming processing system?
Structured streaming and Kafka ?  any other ?

Regards
Liang

Aniket Adnaik wrote
Hi All,

I would like to open up a discussion for new feature to support streaming
ingestion in CarbonData.

Please refer to design document(draft) in the link below.
      https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
/view?usp=sharing

Your comments/suggestions are welcome.
Here are some high level points.

Rationale:
The current ways of adding user data to CarbonData table is via LOAD
statement or using SELECT query with INSERT INTO statement. These methods
add bulk of data into CarbonData table into a new segment. Basically, it is
a batch insertion for a bulk of data. However, with increasing demand of
real time data analytics with streaming frameworks, CarbonData needs a way
to insert streaming data continuously into CarbonData table. CarbonData
needs a support for continuous and faster ingestion into CarbonData table
and make it available for querying.

CarbonData can leverage from our newly introduced V3 format to append
streaming data to existing carbon table.


Requirements:

Following are some high level requirements;
1.  CarbonData shall create a new segment (Streaming Segment) for each
streaming session. Concurrent streaming ingestion into same table will
create separate streaming segments.

2.  CarbonData shall use write optimized format (instead of multi-layered
indexed columnar format) to support ingestion of streaming data into a
CarbonData table.

3.  CarbonData shall create streaming segment folder and open a streaming
data file in append mode to write data. CarbonData should avoid creating
multiple small files by appending to an existing file.

4.  The data stored in new streaming segment shall be available for query
after it is written to the disk (hflush/hsync). In other words, CarbonData
Readers should be able to query the data in streaming segment written so
far.

5.  CarbonData should acknowledge the write operation status back to output
sink/upper layer streaming engine so that in the case of write failure,
streaming engine should restart the operation and maintain exactly once
delivery semantics.

6.  CarbonData Compaction process shall support compacting data from
write-optimized streaming segment to regular read optimized columnar
CarbonData format.

7.  CarbonData readers should maintain the read consistency by means of
using timestamp.

8.  Maintain durability - in case of write failure, CarbonData should be
able recover to latest commit status. This may require maintaining source
and destination offsets of last commits in a metadata.

This feature can be done in phases;

Phase -1 : Add basic framework and writer support to allow Spark Structured
streaming into CarbonData . This phase may or may not have append support.
Add reader support to read streaming data files.

Phase-2 : Add append support if not done in phase 1. Maintain append
offsets and metadata information.

Phase -3 : Add support for external streaming frameworks such as Kafka
streaming using spark structured steaming, maintain
topics/partitions/offsets and support fault tolerance .

Phase-4 : Add support to other streaming frameworks , such as flink , beam
etc.

Phase-5: Future support for in-memory cache for buffering streaming data,
support for union with Spark Structured streaming to serve directly from
spark structured streaming.  And add support for Time series data.

Best Regards,
Aniket
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Jacky Li
In reply to this post by Aniket Adnaik
Hi Aniket,

Comment inline
And I have put some review comment in the PDF here: https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing <https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing>

> 在 2017年3月29日,上午7:10,Aniket Adnaik <[hidden email]> 写道:
>
> Hi Jacky,
>
> Please see my comments below;
> 1. In this phase, is it still using columnar format? Save to a file for
> every mini batch? If so, it is only readable after the file has been closed
> and some metadata need to be kept to indicate the availability of the new
> file.
>
> AA >> yes, for initial phase it will use default columnar format and save
> to file every mini batch. Closing of file may not be needed, as HDFS allows
> single writer-multiple readers. But yes it will require us to maintain a
> streaming_status file to let readers know about valid timestamp and offsets
> during getsplits.
>
> 2. How to map the partition concept in spark to files in streaming segment?
> I guess some small file will be created, right?
>
> AA>> In streaming context, writeStream.partitionBy() may require CarbonData
> to create separate folder for each partition.
> Folder may look like  \TableName\_Fact\part0\StreamingSegment\
> *partition_0\streamingfile.001*
> However, I am not sure how carbondata will utilize this partition info as
> my assumption is currently CarbonData does not support partitioning.Also, I
> am not sure if existing table with no partitioning schema can work well.
> This needs further analysis.
>

Currently carbon does not support partition yet, but we do have future plan for partitioning, for the bulkload scenario. The draft idea is to add a partition step before the input step in current loading pipeline framework. And the folder structure may look like: \TableName\Fact\part0\segment0. I will describe it in another thread. It think user can use the same partition key for both bulkload and streaming ingest.


> 3. Phase-2 : Add append support if not done in phase 1. Maintain append offsets
> and metadata information.
> Is the streaming data file format implemented in this phase?
> AA>>  I think we can directly leverage from existing V3 format without much
> changes in basic writer/reader framework, in that case implementing
> streaming file format is a possibility.
>
> Best Regards,
> Aniket
>
> On Tue, Mar 28, 2017 at 8:22 AM, Jacky Li <[hidden email]> wrote:
>
>> Hi Aniket,
>>
>> This feature looks great, the overall plan also seems fine to me. Thanks
>> for proposing it.
>> And I have some doubts inline.
>>
>>> 在 2017年3月27日,下午6:34,Aniket Adnaik <[hidden email]> 写道:
>>>
>>> Hi All,
>>>
>>> I would like to open up a discussion for new feature to support streaming
>>> ingestion in CarbonData.
>>>
>>> Please refer to design document(draft) in the link below.
>>>     https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
>>> /view?usp=sharing
>>>
>>> Your comments/suggestions are welcome.
>>> Here are some high level points.
>>>
>>> Rationale:
>>> The current ways of adding user data to CarbonData table is via LOAD
>>> statement or using SELECT query with INSERT INTO statement. These methods
>>> add bulk of data into CarbonData table into a new segment. Basically, it
>> is
>>> a batch insertion for a bulk of data. However, with increasing demand of
>>> real time data analytics with streaming frameworks, CarbonData needs a
>> way
>>> to insert streaming data continuously into CarbonData table. CarbonData
>>> needs a support for continuous and faster ingestion into CarbonData table
>>> and make it available for querying.
>>>
>>> CarbonData can leverage from our newly introduced V3 format to append
>>> streaming data to existing carbon table.
>>>
>>>
>>> Requirements:
>>>
>>> Following are some high level requirements;
>>> 1.  CarbonData shall create a new segment (Streaming Segment) for each
>>> streaming session. Concurrent streaming ingestion into same table will
>>> create separate streaming segments.
>>>
>>> 2.  CarbonData shall use write optimized format (instead of multi-layered
>>> indexed columnar format) to support ingestion of streaming data into a
>>> CarbonData table.
>>>
>>> 3.  CarbonData shall create streaming segment folder and open a streaming
>>> data file in append mode to write data. CarbonData should avoid creating
>>> multiple small files by appending to an existing file.
>>>
>>> 4.  The data stored in new streaming segment shall be available for query
>>> after it is written to the disk (hflush/hsync). In other words,
>> CarbonData
>>> Readers should be able to query the data in streaming segment written so
>>> far.
>>>
>>> 5.  CarbonData should acknowledge the write operation status back to
>> output
>>> sink/upper layer streaming engine so that in the case of write failure,
>>> streaming engine should restart the operation and maintain exactly once
>>> delivery semantics.
>>>
>>> 6.  CarbonData Compaction process shall support compacting data from
>>> write-optimized streaming segment to regular read optimized columnar
>>> CarbonData format.
>>>
>>> 7.  CarbonData readers should maintain the read consistency by means of
>>> using timestamp.
>>>
>>> 8.  Maintain durability - in case of write failure, CarbonData should be
>>> able recover to latest commit status. This may require maintaining source
>>> and destination offsets of last commits in a metadata.
>>>
>>> This feature can be done in phases;
>>>
>>> Phase -1 : Add basic framework and writer support to allow Spark
>> Structured
>>> streaming into CarbonData . This phase may or may not have append
>> support.
>>> Add reader support to read streaming data files.
>>>
>> 1. In this phase, is it still using columnar format? Save to a file for
>> every mini batch? If so, it is only readable after the file has been closed
>> and some metadata need to be kept to indicate the availability of the new
>> file.
>> 2. How to map the partition concept in spark to files in streaming
>> segment? I guess some small file will be created, right?
>>
>>> Phase-2 : Add append support if not done in phase 1. Maintain append
>>> offsets and metadata information.
>>>
>> Is the streaming data file format implemented in this phase?
>>
>>> Phase -3 : Add support for external streaming frameworks such as Kafka
>>> streaming using spark structured steaming, maintain
>>> topics/partitions/offsets and support fault tolerance .
>>>
>>> Phase-4 : Add support to other streaming frameworks , such as flink ,
>> beam
>>> etc.
>>>
>>> Phase-5: Future support for in-memory cache for buffering streaming data,
>>> support for union with Spark Structured streaming to serve directly from
>>> spark structured streaming.  And add support for Time series data.
>>>
>>> Best Regards,
>>> Aniket
>>>
>>
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Aniket Adnaik
In reply to this post by Liang Chen
Hi Liang,

Thanks, please see my comments to your questions.

2. Whether support compaction for streaming ingested data to add index, or
not ?
AA>> Yes, Eventually we would need streaming data files to be compacted
into regular read optimized CarbonData format. Triggering of compaction can
be based on the number of files in streaming segment.

3. For first version of streaming ingestion feature, will support which kind
of streaming processing system?
Structured streaming and Kafka ?  any other ?
AA>> for first phase we can support file source and socket source. For
Kafka as streaming source, there are some additional functionalities needs
to be covered like partitioning, Kafka offset management and , consistency
with carbon streaming ingestion, so we may defer it for later phase.

Best Regards,
Aniket


On Wed, Mar 29, 2017 at 2:00 AM, Liang Chen <[hidden email]> wrote:

> Hi Aniket
>
> Thanks for your great contribution, The feature of ingestion streaming data
> to carbondata would be very useful for some real-time query scenarios.
>
> Some inputs from my side:
>
> 1. I agree with approach 2 for streaming file format, the performance for
> query must be ensured.
> 2. Whether support compaction for streaming ingested data to add index, or
> not ?
> ------------------------------------------------------------
> --------------------------------
> CarbonData shall use write optimized format (instead of multi-layered
> indexed columnar format) to support ingestion of streaming data into a
> CarbonData table.
>
> 3. For first version of streaming ingestion feature, will support which
> kind
> of streaming processing system?
> Structured streaming and Kafka ?  any other ?
>
> Regards
> Liang
>
>
> Aniket Adnaik wrote
> > Hi All,
> >
> > I would like to open up a discussion for new feature to support streaming
> > ingestion in CarbonData.
> >
> > Please refer to design document(draft) in the link below.
> >       https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
> > /view?usp=sharing
> >
> > Your comments/suggestions are welcome.
> > Here are some high level points.
> >
> > Rationale:
> > The current ways of adding user data to CarbonData table is via LOAD
> > statement or using SELECT query with INSERT INTO statement. These methods
> > add bulk of data into CarbonData table into a new segment. Basically, it
> > is
> > a batch insertion for a bulk of data. However, with increasing demand of
> > real time data analytics with streaming frameworks, CarbonData needs a
> way
> > to insert streaming data continuously into CarbonData table. CarbonData
> > needs a support for continuous and faster ingestion into CarbonData table
> > and make it available for querying.
> >
> > CarbonData can leverage from our newly introduced V3 format to append
> > streaming data to existing carbon table.
> >
> >
> > Requirements:
> >
> > Following are some high level requirements;
> > 1.  CarbonData shall create a new segment (Streaming Segment) for each
> > streaming session. Concurrent streaming ingestion into same table will
> > create separate streaming segments.
> >
> > 2.  CarbonData shall use write optimized format (instead of multi-layered
> > indexed columnar format) to support ingestion of streaming data into a
> > CarbonData table.
> >
> > 3.  CarbonData shall create streaming segment folder and open a streaming
> > data file in append mode to write data. CarbonData should avoid creating
> > multiple small files by appending to an existing file.
> >
> > 4.  The data stored in new streaming segment shall be available for query
> > after it is written to the disk (hflush/hsync). In other words,
> CarbonData
> > Readers should be able to query the data in streaming segment written so
> > far.
> >
> > 5.  CarbonData should acknowledge the write operation status back to
> > output
> > sink/upper layer streaming engine so that in the case of write failure,
> > streaming engine should restart the operation and maintain exactly once
> > delivery semantics.
> >
> > 6.  CarbonData Compaction process shall support compacting data from
> > write-optimized streaming segment to regular read optimized columnar
> > CarbonData format.
> >
> > 7.  CarbonData readers should maintain the read consistency by means of
> > using timestamp.
> >
> > 8.  Maintain durability - in case of write failure, CarbonData should be
> > able recover to latest commit status. This may require maintaining source
> > and destination offsets of last commits in a metadata.
> >
> > This feature can be done in phases;
> >
> > Phase -1 : Add basic framework and writer support to allow Spark
> > Structured
> > streaming into CarbonData . This phase may or may not have append
> support.
> > Add reader support to read streaming data files.
> >
> > Phase-2 : Add append support if not done in phase 1. Maintain append
> > offsets and metadata information.
> >
> > Phase -3 : Add support for external streaming frameworks such as Kafka
> > streaming using spark structured steaming, maintain
> > topics/partitions/offsets and support fault tolerance .
> >
> > Phase-4 : Add support to other streaming frameworks , such as flink ,
> beam
> > etc.
> >
> > Phase-5: Future support for in-memory cache for buffering streaming data,
> > support for union with Spark Structured streaming to serve directly from
> > spark structured streaming.  And add support for Time series data.
> >
> > Best Regards,
> > Aniket
>
>
>
>
>
> --
> View this message in context: http://apache-carbondata-
> mailing-list-archive.1130556.n5.nabble.com/DISCUSSION-New-
> Feature-Streaming-Ingestion-into-CarbonData-tp9724p9803.html
> Sent from the Apache CarbonData Mailing List archive mailing list archive
> at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Aniket Adnaik
In reply to this post by Jacky Li
Hi Jacky,

Thanks for your comments. I guess i should have uploaded in google doc
format instead of pdf, somehow google doc messes up all the diagrams if I
copy paste and i have not figured the way to fix it. Anyway, I apologize
for the inconvenience for those wanted to add in-line comments in the
document. For now, I will try to address your questions through email below
and see if I can reload comment-able version in google docs.

Context : The table status file in metadata folder will be used to indicate
the status of streaming ingestion, such as in-progress or
successful.
Jacky>> Does this mean status file need to be rewritten for every mini
batch?
AA>> currently Yes, this may not so efficient, we can think of keeping this
info into metastore. Let me know if you have any ideas.

Context: Only one writer (executor) is allowed to write into streaming data
file.
Jacky>> Every writer write to one file, but there could be parallel writer
write to different file, right?
AA>> Yes, there could be parallel executor writing to a different file.

Context: BlockHeader
Jacky>> In BlockletHeader, I think two more fields need to add:
i64 blocklet_offset and list<DataChunk3> datachunks ; DataChunk3 contains
the min/max of each column in each page mutation, blockletindex,
blockletinfo, dictionary these are not required
AA>> Yes, this probably needs more refinement and discussion. I was
thinking more in the lines of using existing V3 format , than adding a new
one.

Context: Approach-2 for file format
Jacky>> Is this metadata file appendable? It should not have Footer then
And how to maintain the locality of this file and the stream file together?
AA>> yes metdata file will be appendable. Footer will added when file is
complete. Ideally co-location with base streaming file will be the best
case, not sure if HDFS data placement policy provides any configuration.

Context: Write Flow diagram
Jacky>> 1.In structured streaming, does not the executor receive event
directly from streaming source?
AA>> Yes , After receiver is setup , driver will have StreamingQueryListner
to communicate with executors. I will add arrows from source to executors
to be more clear.
2.Is the metadata protected by some lock? How 2 executor write to it
simultaniencely?
AA>> yes, metadata will to be protected by lock. Again, need to explore
more efficient way if there is one.

Context: Read Consistency
Jacky>> I think more need to analyze here, how about a query consist of 2
scan operations in different stages?
AA>> Need to check on that. My assumption is that we have only one query
start time-stamp that which can be utilized.

Context: Compaction
Jacky>>Can we have some policy so that user does not need to manually
trigger it?
AA>> Yes, this needs to be configurable based on number of streaming files.

Context: Spark Structured streaming info/ background , "No aggregation
supported"
Jacky>> you mean no aggregate query is allowed?
AA>> This limit is on the writer side, means spark writeStream with file
sink for parquet does not support performing aggregation before writing to
file sink. Once the data is written, it should be able with aggregated
query.

Best Regards,
Aniket

On Wed, Mar 29, 2017 at 8:46 AM, Jacky Li <[hidden email]> wrote:

> Hi Aniket,
>
> Comment inline
> And I have put some review comment in the PDF here:
> https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/
> view?usp=sharing <https://drive.google.com/file/d/
> 0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing>
>
> > 在 2017年3月29日,上午7:10,Aniket Adnaik <[hidden email]> 写道:
> >
> > Hi Jacky,
> >
> > Please see my comments below;
> > 1. In this phase, is it still using columnar format? Save to a file for
> > every mini batch? If so, it is only readable after the file has been
> closed
> > and some metadata need to be kept to indicate the availability of the new
> > file.
> >
> > AA >> yes, for initial phase it will use default columnar format and save
> > to file every mini batch. Closing of file may not be needed, as HDFS
> allows
> > single writer-multiple readers. But yes it will require us to maintain a
> > streaming_status file to let readers know about valid timestamp and
> offsets
> > during getsplits.
> >
> > 2. How to map the partition concept in spark to files in streaming
> segment?
> > I guess some small file will be created, right?
> >
> > AA>> In streaming context, writeStream.partitionBy() may require
> CarbonData
> > to create separate folder for each partition.
> > Folder may look like  \TableName\_Fact\part0\StreamingSegment\
> > *partition_0\streamingfile.001*
> > However, I am not sure how carbondata will utilize this partition info as
> > my assumption is currently CarbonData does not support
> partitioning.Also, I
> > am not sure if existing table with no partitioning schema can work well.
> > This needs further analysis.
> >
>
> Currently carbon does not support partition yet, but we do have future
> plan for partitioning, for the bulkload scenario. The draft idea is to add
> a partition step before the input step in current loading pipeline
> framework. And the folder structure may look like: \TableName\Fact\part0\segment0.
> I will describe it in another thread. It think user can use the same
> partition key for both bulkload and streaming ingest.
>
>
> > 3. Phase-2 : Add append support if not done in phase 1. Maintain append
> offsets
> > and metadata information.
> > Is the streaming data file format implemented in this phase?
> > AA>>  I think we can directly leverage from existing V3 format without
> much
> > changes in basic writer/reader framework, in that case implementing
> > streaming file format is a possibility.
> >
> > Best Regards,
> > Aniket
> >
> > On Tue, Mar 28, 2017 at 8:22 AM, Jacky Li <[hidden email]> wrote:
> >
> >> Hi Aniket,
> >>
> >> This feature looks great, the overall plan also seems fine to me. Thanks
> >> for proposing it.
> >> And I have some doubts inline.
> >>
> >>> 在 2017年3月27日,下午6:34,Aniket Adnaik <[hidden email]> 写道:
> >>>
> >>> Hi All,
> >>>
> >>> I would like to open up a discussion for new feature to support
> streaming
> >>> ingestion in CarbonData.
> >>>
> >>> Please refer to design document(draft) in the link below.
> >>>     https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
> >>> /view?usp=sharing
> >>>
> >>> Your comments/suggestions are welcome.
> >>> Here are some high level points.
> >>>
> >>> Rationale:
> >>> The current ways of adding user data to CarbonData table is via LOAD
> >>> statement or using SELECT query with INSERT INTO statement. These
> methods
> >>> add bulk of data into CarbonData table into a new segment. Basically,
> it
> >> is
> >>> a batch insertion for a bulk of data. However, with increasing demand
> of
> >>> real time data analytics with streaming frameworks, CarbonData needs a
> >> way
> >>> to insert streaming data continuously into CarbonData table. CarbonData
> >>> needs a support for continuous and faster ingestion into CarbonData
> table
> >>> and make it available for querying.
> >>>
> >>> CarbonData can leverage from our newly introduced V3 format to append
> >>> streaming data to existing carbon table.
> >>>
> >>>
> >>> Requirements:
> >>>
> >>> Following are some high level requirements;
> >>> 1.  CarbonData shall create a new segment (Streaming Segment) for each
> >>> streaming session. Concurrent streaming ingestion into same table will
> >>> create separate streaming segments.
> >>>
> >>> 2.  CarbonData shall use write optimized format (instead of
> multi-layered
> >>> indexed columnar format) to support ingestion of streaming data into a
> >>> CarbonData table.
> >>>
> >>> 3.  CarbonData shall create streaming segment folder and open a
> streaming
> >>> data file in append mode to write data. CarbonData should avoid
> creating
> >>> multiple small files by appending to an existing file.
> >>>
> >>> 4.  The data stored in new streaming segment shall be available for
> query
> >>> after it is written to the disk (hflush/hsync). In other words,
> >> CarbonData
> >>> Readers should be able to query the data in streaming segment written
> so
> >>> far.
> >>>
> >>> 5.  CarbonData should acknowledge the write operation status back to
> >> output
> >>> sink/upper layer streaming engine so that in the case of write failure,
> >>> streaming engine should restart the operation and maintain exactly once
> >>> delivery semantics.
> >>>
> >>> 6.  CarbonData Compaction process shall support compacting data from
> >>> write-optimized streaming segment to regular read optimized columnar
> >>> CarbonData format.
> >>>
> >>> 7.  CarbonData readers should maintain the read consistency by means of
> >>> using timestamp.
> >>>
> >>> 8.  Maintain durability - in case of write failure, CarbonData should
> be
> >>> able recover to latest commit status. This may require maintaining
> source
> >>> and destination offsets of last commits in a metadata.
> >>>
> >>> This feature can be done in phases;
> >>>
> >>> Phase -1 : Add basic framework and writer support to allow Spark
> >> Structured
> >>> streaming into CarbonData . This phase may or may not have append
> >> support.
> >>> Add reader support to read streaming data files.
> >>>
> >> 1. In this phase, is it still using columnar format? Save to a file for
> >> every mini batch? If so, it is only readable after the file has been
> closed
> >> and some metadata need to be kept to indicate the availability of the
> new
> >> file.
> >> 2. How to map the partition concept in spark to files in streaming
> >> segment? I guess some small file will be created, right?
> >>
> >>> Phase-2 : Add append support if not done in phase 1. Maintain append
> >>> offsets and metadata information.
> >>>
> >> Is the streaming data file format implemented in this phase?
> >>
> >>> Phase -3 : Add support for external streaming frameworks such as Kafka
> >>> streaming using spark structured steaming, maintain
> >>> topics/partitions/offsets and support fault tolerance .
> >>>
> >>> Phase-4 : Add support to other streaming frameworks , such as flink ,
> >> beam
> >>> etc.
> >>>
> >>> Phase-5: Future support for in-memory cache for buffering streaming
> data,
> >>> support for union with Spark Structured streaming to serve directly
> from
> >>> spark structured streaming.  And add support for Time series data.
> >>>
> >>> Best Regards,
> >>> Aniket
> >>>
> >>
> >>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

ZhuWilliam
In reply to this post by Aniket Adnaik
The Design is too complex  and we may spend to much time and people to develop it. Can we simplify it and just support streaming first?