Re: [DISCUSSION]: (New Feature) Streaming Ingestion into CarbonData

Posted by Jacky Li on
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/DISCUSSION-New-Feature-Streaming-Ingestion-into-CarbonData-tp9724p9822.html

Hi Aniket,

Comment inline
And I have put some review comment in the PDF here: https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing <https://drive.google.com/file/d/0B5vjWGChUwXdSUV0OTFkTGE4am8/view?usp=sharing>

> 在 2017年3月29日,上午7:10,Aniket Adnaik <[hidden email]> 写道:
>
> Hi Jacky,
>
> Please see my comments below;
> 1. In this phase, is it still using columnar format? Save to a file for
> every mini batch? If so, it is only readable after the file has been closed
> and some metadata need to be kept to indicate the availability of the new
> file.
>
> AA >> yes, for initial phase it will use default columnar format and save
> to file every mini batch. Closing of file may not be needed, as HDFS allows
> single writer-multiple readers. But yes it will require us to maintain a
> streaming_status file to let readers know about valid timestamp and offsets
> during getsplits.
>
> 2. How to map the partition concept in spark to files in streaming segment?
> I guess some small file will be created, right?
>
> AA>> In streaming context, writeStream.partitionBy() may require CarbonData
> to create separate folder for each partition.
> Folder may look like  \TableName\_Fact\part0\StreamingSegment\
> *partition_0\streamingfile.001*
> However, I am not sure how carbondata will utilize this partition info as
> my assumption is currently CarbonData does not support partitioning.Also, I
> am not sure if existing table with no partitioning schema can work well.
> This needs further analysis.
>

Currently carbon does not support partition yet, but we do have future plan for partitioning, for the bulkload scenario. The draft idea is to add a partition step before the input step in current loading pipeline framework. And the folder structure may look like: \TableName\Fact\part0\segment0. I will describe it in another thread. It think user can use the same partition key for both bulkload and streaming ingest.


> 3. Phase-2 : Add append support if not done in phase 1. Maintain append offsets
> and metadata information.
> Is the streaming data file format implemented in this phase?
> AA>>  I think we can directly leverage from existing V3 format without much
> changes in basic writer/reader framework, in that case implementing
> streaming file format is a possibility.
>
> Best Regards,
> Aniket
>
> On Tue, Mar 28, 2017 at 8:22 AM, Jacky Li <[hidden email]> wrote:
>
>> Hi Aniket,
>>
>> This feature looks great, the overall plan also seems fine to me. Thanks
>> for proposing it.
>> And I have some doubts inline.
>>
>>> 在 2017年3月27日,下午6:34,Aniket Adnaik <[hidden email]> 写道:
>>>
>>> Hi All,
>>>
>>> I would like to open up a discussion for new feature to support streaming
>>> ingestion in CarbonData.
>>>
>>> Please refer to design document(draft) in the link below.
>>>     https://drive.google.com/file/d/0B71_EuXTdDi8MlFDU2tqZU9BZ3M
>>> /view?usp=sharing
>>>
>>> Your comments/suggestions are welcome.
>>> Here are some high level points.
>>>
>>> Rationale:
>>> The current ways of adding user data to CarbonData table is via LOAD
>>> statement or using SELECT query with INSERT INTO statement. These methods
>>> add bulk of data into CarbonData table into a new segment. Basically, it
>> is
>>> a batch insertion for a bulk of data. However, with increasing demand of
>>> real time data analytics with streaming frameworks, CarbonData needs a
>> way
>>> to insert streaming data continuously into CarbonData table. CarbonData
>>> needs a support for continuous and faster ingestion into CarbonData table
>>> and make it available for querying.
>>>
>>> CarbonData can leverage from our newly introduced V3 format to append
>>> streaming data to existing carbon table.
>>>
>>>
>>> Requirements:
>>>
>>> Following are some high level requirements;
>>> 1.  CarbonData shall create a new segment (Streaming Segment) for each
>>> streaming session. Concurrent streaming ingestion into same table will
>>> create separate streaming segments.
>>>
>>> 2.  CarbonData shall use write optimized format (instead of multi-layered
>>> indexed columnar format) to support ingestion of streaming data into a
>>> CarbonData table.
>>>
>>> 3.  CarbonData shall create streaming segment folder and open a streaming
>>> data file in append mode to write data. CarbonData should avoid creating
>>> multiple small files by appending to an existing file.
>>>
>>> 4.  The data stored in new streaming segment shall be available for query
>>> after it is written to the disk (hflush/hsync). In other words,
>> CarbonData
>>> Readers should be able to query the data in streaming segment written so
>>> far.
>>>
>>> 5.  CarbonData should acknowledge the write operation status back to
>> output
>>> sink/upper layer streaming engine so that in the case of write failure,
>>> streaming engine should restart the operation and maintain exactly once
>>> delivery semantics.
>>>
>>> 6.  CarbonData Compaction process shall support compacting data from
>>> write-optimized streaming segment to regular read optimized columnar
>>> CarbonData format.
>>>
>>> 7.  CarbonData readers should maintain the read consistency by means of
>>> using timestamp.
>>>
>>> 8.  Maintain durability - in case of write failure, CarbonData should be
>>> able recover to latest commit status. This may require maintaining source
>>> and destination offsets of last commits in a metadata.
>>>
>>> This feature can be done in phases;
>>>
>>> Phase -1 : Add basic framework and writer support to allow Spark
>> Structured
>>> streaming into CarbonData . This phase may or may not have append
>> support.
>>> Add reader support to read streaming data files.
>>>
>> 1. In this phase, is it still using columnar format? Save to a file for
>> every mini batch? If so, it is only readable after the file has been closed
>> and some metadata need to be kept to indicate the availability of the new
>> file.
>> 2. How to map the partition concept in spark to files in streaming
>> segment? I guess some small file will be created, right?
>>
>>> Phase-2 : Add append support if not done in phase 1. Maintain append
>>> offsets and metadata information.
>>>
>> Is the streaming data file format implemented in this phase?
>>
>>> Phase -3 : Add support for external streaming frameworks such as Kafka
>>> streaming using spark structured steaming, maintain
>>> topics/partitions/offsets and support fault tolerance .
>>>
>>> Phase-4 : Add support to other streaming frameworks , such as flink ,
>> beam
>>> etc.
>>>
>>> Phase-5: Future support for in-memory cache for buffering streaming data,
>>> support for union with Spark Structured streaming to serve directly from
>>> spark structured streaming.  And add support for Time series data.
>>>
>>> Best Regards,
>>> Aniket
>>>
>>
>>
>>
>>