[GitHub] [carbondata] niuge01 opened a new pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

classic Classic list List threaded Threaded
21 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 opened a new pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox

niuge01 opened a new pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752


    ### Why is this PR needed?
    Provide an end-to-end guide to help user understand and use flink integration module.
   
    ### What changes were proposed in this PR?
    Add file docs/flink-integration-guide.md
       
    ### Does this PR introduce any user interface change?
    - No
   
    ### Is any new testcase added?
    - No
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox

CarbonDataQA1 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-625050735


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1248/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-625052127


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2966/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

Indhumathi27 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421233356



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios

Review comment:
       Please add license header

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios

Review comment:
       ```suggestion
   ## Usage scenarios
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description

Review comment:
       ```suggestion
   ## Usage description
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process

Review comment:
       ```suggestion
   ### Writing process
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties

Review comment:
       ```suggestion
   ### Writer properties
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer

Review comment:
       ```suggestion
   #### Local Writer
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer

Review comment:
       ```suggestion
   #### S3 Writer
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+###Insert from stage

Review comment:
       ```suggestion
   ### Insert from stage
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.

Review comment:
       ```suggestion
       // 1. Create a new flink execution environment.
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+###Insert from stage
+
+  [Grammar Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
+
+##Usage Example Code

Review comment:
       ```suggestion
   ## Usage Example Code
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.

Review comment:
       ```suggestion
     The CarbonData flink integration module is used to connect Flink and Carbon in the above scenario.
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.

Review comment:
       ```suggestion
     after the `INSERT INTO $tableName STAGE` command is executed.
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |

Review comment:
       ```suggestion
     | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and move to target data path finally.        |
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+###Insert from stage
+
+  [Grammar Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)

Review comment:
       ```suggestion
     Refer [Grammar Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files) for syntax.
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |

Review comment:
       ```suggestion
     | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and move to target data path finally.        |
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)

Review comment:
       ```suggestion
     Typical flink stream: `Source -> Process -> Output(Carbon Writer Sink)`
   ```

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+###Insert from stage
+
+  [Grammar Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
+
+##Usage Example Code
+
+  Writing flink data to local carbon table.
+
+  ```scala
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonLocalProperty
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+    import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+    val databaseName = "default"
+    val tableName = "test"
+    val tablePath = "/data/warehouse/test"
+    val dataTempPath = "/data/temp/"
+
+    val tableProperties = new Properties
+
+    val writerProperties = new Properties
+    writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
+
+    val carbonProperties = new Properties
+    carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
+
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    environment.setParallelism(1)
+    environment.enableCheckpointing(2000L)
+    environment.setRestartStrategy(RestartStrategies.noRestart)
+
+    // Define a custom source.
+    val source = new SourceFunction[Array[AnyRef]]() {
+      override
+      def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = {
+        // Array length should equals to table column count, and values order in array should matches table column order.
+        val data = new Array[AnyRef](3)
+        data(0) = "value1"
+        data(1) = "value2"
+        data(2) = "value3"
+        sourceContext.collect(data)
+      }
+
+      override
+      def cancel(): Unit = {
+        // do something.
+      }
+    }
+
+    val stream = environment.addSource(source)
+    val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build
+
+    stream.addSink(streamSink)
+
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // TODO
+        throw new UnsupportedOperationException(exception)
+    }

Review comment:
       Add Insert stage command in the example

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios

Review comment:
       Can add title `# Carbon Flink Integration guide`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

Indhumathi27 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-625278288


   @niuge01 Please add a link in readme.md file


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421890386



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios

Review comment:
       OK

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421890531



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description

Review comment:
       OK

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421890603



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421890931



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421890991



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer

Review comment:
       OK

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+###Insert from stage

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421921350



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+###Insert from stage
+
+  [Grammar Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
+
+##Usage Example Code

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421921411



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421921602



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421921906



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] niuge01 commented on a change in pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421921948



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |

Review comment:
       OK

##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon,
+  for subsequent analysis and queries.
+
+  The CarbonData flink integration module is used connect Flink and Carbon in the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter implementations
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into
+  the stage directory of the target table by the CarbonXXXWriter.
+
+  By default, those data in table stage directory, can not be immediately queried, those data can be queried
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the visibility of data
+  and the controllable amount of data processed during the execution of each insert form stage command,
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data visibility requirements
+  of the actual business and the flink data traffic. When the data visibility requirements are high
+  or the data traffic is large, the execution interval should be appropriately shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description:
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+  
+    // Specify database name.
+    val databaseName = "default"
+  
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+  
+    val tableProperties = new Properties
+    // Set the table properties here.
+  
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
+  
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, etc.
+    
+    // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+    
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.
+    val environment = StreamExecutionEnvironment.getExecutionEnvironment
+    // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
+  
+    // 2. Create flink data source, may be a kafka source, custom source, or others.
+    // The data type of source should be Array[AnyRef].
+    // Array length should equals to table column count, and values order in array should matches table column order.
+    val source = ...
+    // 3. Create flink stream and set source.
+    val stream = environment.addSource(source)
+    // 4. Add other flink operators here.
+    // ...
+    // 5. Set flink stream target (write data to carbon with a write sink).
+    stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+    // 6. Run flink stream.
+    try {
+      environment.execute
+    } catch {
+      case exception: Exception =>
+        // Handle execute exception here.
+    }
+  ```
+
+###Writer properties
+
+####Local Writer
+
+  | Property                             | Name                                 | Description                                                                                             |
+  |--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonLocalProperty.DATA_TEMP_PATH   | carbon.writer.local.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |
+  | CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+  | Property                          | Name                              | Description                                                                                             |
+  |-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+  | CarbonS3Property.ACCESS_KEY       | carbon.writer.s3.access.key       | Access key of s3 file system                                                                            |
+  | CarbonS3Property.SECRET_KEY       | carbon.writer.s3.secret.key       | Secret key of s3 file system                                                                            |
+  | CarbonS3Property.ENDPOINT         | carbon.writer.s3.endpoint         | Endpoint of s3 file system                                                                              |
+  | CarbonS3Property.DATA_TEMP_PATH   | carbon.writer.s3.data.temp.path   | Usually is a local path, data will write to temp path first, and mv to target data path finally.        |

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-625651573


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2973/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-625654061


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1255/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

Indhumathi27 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-625673191


   @niuge01 Please fix the remaining comments


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

Indhumathi27 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-628382519


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on pull request #3752: [CARBONDATA-3804] Provide end-to-end flink integration guide

GitBox
In reply to this post by GitBox

kunal642 commented on pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#issuecomment-629051386


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12