[GitHub] [carbondata] akashrn5 opened a new pull request #3875: [WIP]Presto write transactional

classic Classic list List threaded Threaded
91 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 opened a new pull request #3875: [WIP]Presto write transactional

GitBox

akashrn5 opened a new pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875


    ### Why is this PR needed?
   
   
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666285415


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3557/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666285691


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1817/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666375381


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3562/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666376506


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1823/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666583588


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3570/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666586295


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1831/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666973773


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3574/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-666976847


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1835/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-680843243


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2139/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [WIP]Presto write transactional

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-680843631


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3880/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-689036860


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2268/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-689040369


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4008/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

jackylk commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491843798



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2525,4 +2525,9 @@ private CarbonCommonConstants() {
    * property which defines the presto query default value
    */
   public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+  /**
+   * property to send load model from coordinator to worker in presto
+   */
+  public static final String CARBON_PRESTO_LOAD_MODEL = "presto.carbondata.encoded.loadmodel";

Review comment:
       Is this a configurable item by user? If not, suggest to move to presto integration module only.
   And I think it is better to change to start with "carbondata", but not "presto" since it is a configuration from carbondata




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

jackylk commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491844214



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2525,4 +2525,9 @@ private CarbonCommonConstants() {
    * property which defines the presto query default value
    */
   public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+  /**
+   * property to send load model from coordinator to worker in presto

Review comment:
       Please desicribe in which case user would set it to true?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491866753



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2525,4 +2525,9 @@ private CarbonCommonConstants() {
    * property which defines the presto query default value
    */
   public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+  /**
+   * property to send load model from coordinator to worker in presto
+   */
+  public static final String CARBON_PRESTO_LOAD_MODEL = "presto.carbondata.encoded.loadmodel";

Review comment:
       its not configurable by user. You are right. I have moved the constant to `CarbonTableConfig` in presto module and renamed it to `carbondata.presto.encoded.loadmodel`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491867047



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2525,4 +2525,9 @@ private CarbonCommonConstants() {
    * property which defines the presto query default value
    */
   public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+  /**
+   * property to send load model from coordinator to worker in presto

Review comment:
       not a user config, same as above comment. Moved to presto module.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-696026596


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4143/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#issuecomment-696038397


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2402/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3875: [CARBONDATA-3934]Support write transactional table with presto.

GitBox
In reply to this post by GitBox

jackylk commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491960506



##########
File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -52,25 +53,30 @@
 
   @Override
   public void setupJob(JobContext jobContext) throws IOException {
-    ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
-    String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
     Random random = new Random();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
     TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
     TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
     org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
         new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
-    CarbonLoadModel carbonLoadModel =
-        HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
-    CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel);
+    CarbonLoadModel carbonLoadModel = null;
+    String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+    if (encodedString != null) {

Review comment:
       please add comment, why this is needed

##########
File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
##########
@@ -76,7 +76,7 @@
 // TODO Move dictionary generator which is coded in spark to MR framework.
 public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> {
 
-  protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
+  public static final String LOAD_MODEL = "mapreduce.carbontable.load.model";

Review comment:
       please move all public variable together into the beginning of this class

##########
File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -52,25 +53,30 @@
 
   @Override
   public void setupJob(JobContext jobContext) throws IOException {
-    ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
-    String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
     Random random = new Random();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
     TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
     TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
     org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
         new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
-    CarbonLoadModel carbonLoadModel =
-        HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
-    CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel);
+    CarbonLoadModel carbonLoadModel = null;
+    String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+    if (encodedString != null) {
+      carbonLoadModel =
+          (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString);
+    }
+    if (null == carbonLoadModel) {
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
+      String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
+      carbonLoadModel = HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
+      CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel);
+      String loadModelStr = jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL);
+      jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV, a + ",carbon=" + loadModelStr);

Review comment:
       please add comment, what is it doing?

##########
File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
##########
@@ -92,6 +95,11 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOEx
     }
     String tablePath = FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath();
     TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id"));
+    if (taskAttemptID == null) {
+      SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");

Review comment:
       please add comment, why is it needed. In what case it will be null?

##########
File path: integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
##########
@@ -40,6 +40,12 @@
   private String endPoint;
   private String pushRowFilter;
 
+  /**
+   * Property to send load model from coordinator to worker in presto. This is internal constant
+   * and not exposed to user.

Review comment:
       describe what are the option values

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {

Review comment:
       why not check encodedLoadModel against true or false?

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");

Review comment:
       do not assign this.outPutPath  repeatedly

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {
+      configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+    }
+    try {
+      boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+      Object writer =
+          Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+      recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+          .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress,
+              properties, Reporter.NULL);
+    } catch (Exception e) {
+      LOG.error("error while initializing writer", e);
+      throw new RuntimeException("writer class not found");
+    }
+  }
+
+  @Override public long getWrittenBytes() {

Review comment:
       move @Override to previous line

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataHandleResolver.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import io.prestosql.plugin.hive.HiveHandleResolver;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+
+public class CarbonDataHandleResolver extends HiveHandleResolver {
+
+  @Override public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() {

Review comment:
       move all @Override to previous line

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import com.google.inject.Inject;
+import io.airlift.concurrent.BoundedExecutor;
+import io.airlift.json.JsonCodec;
+import io.airlift.log.Logger;
+import io.prestosql.plugin.hive.ForHive;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.CachingHiveMetastore;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.security.AccessControlMetadataFactory;
+import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider;
+import io.prestosql.spi.type.TypeManager;
+import org.joda.time.DateTimeZone;
+
+public class CarbonMetadataFactory extends HiveMetadataFactory {
+
+  private static final Logger log = Logger.get(HiveMetadataFactory.class);
+  private final boolean allowCorruptWritesForTesting;
+  private final boolean skipDeletionForAlter;
+  private final boolean skipTargetCleanupOnRollback;
+  private final boolean writesToNonManagedTablesEnabled = true;
+  private final boolean createsOfNonManagedTablesEnabled;
+  private final long perTransactionCacheMaximumSize;
+  private final HiveMetastore metastore;
+  private final HdfsEnvironment hdfsEnvironment;
+  private final HivePartitionManager partitionManager;
+  private final DateTimeZone timeZone;
+  private final TypeManager typeManager;
+  private final LocationService locationService;
+  private final BoundedExecutor renameExecution;
+  private final TypeTranslator typeTranslator;
+  private final String prestoVersion;
+  private final AccessControlMetadataFactory accessControlMetadataFactory;
+  private final JsonCodec partitionUpdateCodec;
+
+  @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore metastore,
+      HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
+      @ForHive ExecutorService executorService, TypeManager typeManager,
+      LocationService locationService, JsonCodec<PartitionUpdate> partitionUpdateCodec,
+      TypeTranslator typeTranslator, NodeVersion nodeVersion,
+      AccessControlMetadataFactory accessControlMetadataFactory) {
+    this(metastore, hdfsEnvironment, partitionManager, hiveConfig.getDateTimeZone(),
+        hiveConfig.getMaxConcurrentFileRenames(), hiveConfig.getAllowCorruptWritesForTesting(),
+        hiveConfig.isSkipDeletionForAlter(), hiveConfig.isSkipTargetCleanupOnRollback(),
+        hiveConfig.getWritesToNonManagedTablesEnabled(),
+        hiveConfig.getCreatesOfNonManagedTablesEnabled(),
+        hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager, locationService,
+        partitionUpdateCodec, executorService, typeTranslator, nodeVersion.toString(),
+        accessControlMetadataFactory);
+  }
+
+  public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment,
+      HivePartitionManager partitionManager, DateTimeZone timeZone, int maxConcurrentFileRenames,
+      boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter,

Review comment:
       too many paramters

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data

Review comment:
       age data?

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;

Review comment:
       This can be final

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();

Review comment:
       use `this` for all field assignment in the constructor

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveLocationService;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.spi.connector.ConnectorSession;
+import org.apache.hadoop.fs.Path;
+
+public class CarbonDataLocationService extends HiveLocationService {
+
+  private final HdfsEnvironment hdfsEnvironment;
+
+  @Inject
+  public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) {
+    super(hdfsEnvironment);
+    this.hdfsEnvironment = hdfsEnvironment;
+  }
+
+  @Override
+  public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore,
+      ConnectorSession session, String schemaName, String tableName) {
+    // TODO: check and make it compatible for cloud scenario

Review comment:
       What is missing here?

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));

Review comment:
       do not assign this.outPutPath repeatedly

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+  private HdfsEnvironment hdfsEnvironment;
+  private SemiTransactionalHiveMetastore metastore;
+  private MapredCarbonOutputCommitter carbonOutputCommitter;
+  private JobContextImpl jobContext;
+
+  public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+      HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone,
+      boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled,

Review comment:
       Too many parameters, should create a parameter object

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.FileFormatDataSourceStats;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.metastore.StorageFormat;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataFileWriterFactory implements HiveFileWriterFactory {
+
+  private final HdfsEnvironment hdfsEnvironment;
+  private final TypeManager typeManager;
+  private final NodeVersion nodeVersion;
+  private final FileFormatDataSourceStats stats;
+
+  @Inject
+  public CarbonDataFileWriterFactory(HdfsEnvironment hdfsEnvironment, TypeManager typeManager,
+      NodeVersion nodeVersion, FileFormatDataSourceStats stats) {
+    this(typeManager, hdfsEnvironment, nodeVersion, stats);
+  }
+
+  public CarbonDataFileWriterFactory(TypeManager typeManager, HdfsEnvironment hdfsEnvironment,
+      NodeVersion nodeVersion,
+      FileFormatDataSourceStats stats)
+  {

Review comment:
       please make format better for line 52 ~ 55

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import io.airlift.event.client.EventClient;
+import io.airlift.json.JsonCodec;
+import io.airlift.units.DataSize;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.HivePageSink;
+import io.prestosql.plugin.hive.HivePageSinkProvider;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveWritableTableHandle;
+import io.prestosql.plugin.hive.HiveWriterStats;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.OrcFileWriterFactory;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
+import io.prestosql.plugin.hive.metastore.SortingColumn;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PageIndexerFactory;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorPageSink;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.type.TypeManager;
+
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+public class CarbonDataPageSinkProvider extends HivePageSinkProvider {
+
+  private final Set<HiveFileWriterFactory> fileWriterFactories;
+  private final HdfsEnvironment hdfsEnvironment;
+  private final PageSorter pageSorter;
+  private final HiveMetastore metastore;
+  private final PageIndexerFactory pageIndexerFactory;
+  private final TypeManager typeManager;
+  private final int maxOpenPartitions;
+  private final int maxOpenSortFiles;
+  private final DataSize writerSortBufferSize;
+  private final boolean immutablePartitions;
+  private final LocationService locationService;
+  private final ListeningExecutorService writeVerificationExecutor;
+  private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
+  private final NodeManager nodeManager;
+  private final EventClient eventClient;
+  private final HiveSessionProperties hiveSessionProperties;
+  private final HiveWriterStats hiveWriterStats;
+  private final OrcFileWriterFactory orcFileWriterFactory;
+  private final long perTransactionMetastoreCacheMaximumSize;
+
+  @Inject
+  public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory> fileWriterFactories,
+      HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore metastore,

Review comment:
       Too many parameters, should creata a parameter object

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {
+      configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+    }
+    try {
+      boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+      Object writer =
+          Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+      recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+          .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress,
+              properties, Reporter.NULL);
+    } catch (Exception e) {
+      LOG.error("error while initializing writer", e);
+      throw new RuntimeException("writer class not found");
+    }
+  }
+
+  @Override public long getWrittenBytes() {
+    if (isCommitDone) {
+      try {
+        return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    return 0;
+  }
+
+  @Override public long getSystemMemoryUsage() {
+    return 0;
+  }
+
+  @Override public void appendRows(Page dataPage) {
+    for (int position = 0; position < dataPage.getPositionCount(); position++) {
+      appendRow(dataPage, position);
+    }
+  }
+
+  public void appendRow(Page dataPage, int position) {

Review comment:
       why is it public?

##########
File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.File
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.presto.server.PrestoServer
+import org.apache.carbondata.presto.util.CarbonDataStoreCreator
+
+class PrestoInsertIntoTableTestCase extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach {
+
+  private val logger = LogServiceFactory
+    .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)
+
+  private val rootPath = new File(this.getClass.getResource("/").getPath
+                                  + "../../../..").getCanonicalPath
+  private val storePath = s"$rootPath/integration/presto/target/store"
+  private val systemPath = s"$rootPath/integration/presto/target/system"
+  private val prestoServer = new PrestoServer
+
+  override def beforeAll: Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+      "Presto")
+    val map = new util.HashMap[String, String]()
+    map.put("hive.metastore", "file")
+    map.put("hive.metastore.catalog.dir", s"file://$storePath")
+    map.put("hive.allow-drop-table", "true")
+    prestoServer.startServer("testdb", map)
+    prestoServer.execute("drop schema if exists testdb")
+    prestoServer.execute("create schema testdb")
+  }
+
+  override protected def beforeEach(): Unit = {
+    val query = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') "
+    createTable(query, "testdb", "testtable")
+  }
+
+  private def createTable(query: String, databaseName: String, tableName: String): Unit = {
+    prestoServer.execute(s"drop table if exists ${databaseName}.${tableName}")
+    prestoServer.execute(query)
+    logger.info("Creating The Carbon Store")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier(databaseName, tableName)
+    CarbonDataStoreCreator.createTable(absoluteTableIdentifier, true)
+    logger.info(s"\nCarbon store is created at location: $storePath")
+  }
+
+  private def getAbsoluteIdentifier(dbName: String,
+      tableName: String) = {
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      storePath + "/" + dbName + "/" + tableName,
+      new CarbonTableIdentifier(dbName,
+        tableName,
+        UUID.randomUUID().toString))
+    absoluteTableIdentifier
+  }
+
+  test("test insert with different storage format names") {
+    val query1 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') "
+    val query2 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBON') "
+    val query3 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='ORG.APACHE.CARBONDATA.FORMAT') "
+    createTable(query1, "testdb", "testtable")
+    createTable(query2, "testdb", "testtable")
+    createTable(query3, "testdb", "testtable")
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
+    assert(FileFactory.getCarbonFile(segmentPath).isFileExist)
+  }
+
+  test("test insert into one segment and check folder structure") {
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val tablePath = carbonTable.getTablePath
+    val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0")
+    val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1")
+    val segment0 = FileFactory.getCarbonFile(segment0Path)
+    assert(segment0.isFileExist)
+    assert(segment0.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+        file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+      }
+    }).length == 2)
+    val segment1 = FileFactory.getCarbonFile(segment1Path)
+    assert(segment1.isFileExist)
+    assert(segment1.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+        file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+      }
+    }).length == 2)
+    val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
+    assert(FileFactory.getCarbonFile(segmentsPath).isFileExist && FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2)
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath)
+    FileFactory.getCarbonFile(metadataFolderPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE)
+      }
+    })
+  }
+
+  test("test insert into many segments and check segment count and data count") {
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+    assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8)
+    val actualResult1: List[Map[String, Any]] = prestoServer
+      .executeQuery("select count(*) AS RESULT from testdb.testtable")
+    val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4))
+    assert(actualResult1.equals(expectedResult1))
+    // filter query
+    val actualResult2: List[Map[String, Any]] = prestoServer
+      .executeQuery("select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp '1998-12-16 10:12:09'")
+    val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2))
+    assert(actualResult2.equals(expectedResult2))
+  }
+
+  test("test if the table status contains the segment file name for each load") {
+    prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+    ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment =>
+      val loadMetadataDetails = segment.getLoadMetadataDetails
+      assert(loadMetadataDetails.getSegmentFile != null)
+    }
+  }
+

Review comment:
       To test transactional write, you can add a testcase to query the table while inserting data. In that case, query result should not change before insertion success

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {
+      configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+    }
+    try {
+      boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+      Object writer =
+          Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+      recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+          .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress,
+              properties, Reporter.NULL);
+    } catch (Exception e) {
+      LOG.error("error while initializing writer", e);
+      throw new RuntimeException("writer class not found");
+    }
+  }
+
+  @Override public long getWrittenBytes() {
+    if (isCommitDone) {
+      try {
+        return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    return 0;
+  }
+
+  @Override public long getSystemMemoryUsage() {
+    return 0;
+  }
+
+  @Override public void appendRows(Page dataPage) {
+    for (int position = 0; position < dataPage.getPositionCount(); position++) {
+      appendRow(dataPage, position);
+    }
+  }
+
+  public void appendRow(Page dataPage, int position) {
+    for (int field = 0; field < fieldCount; field++) {
+      Block block = dataPage.getBlock(field);
+      if (block.isNull(position)) {
+        tableInspector.setStructFieldData(row, structFields.get(field), null);
+      } else {
+        setters[field].setField(block, position);
+      }
+    }
+
+    try {
+      recordWriter.write(serDe.serialize(row, tableInspector));
+    } catch (SerDeException | IOException e) {
+      throw new PrestoException(HIVE_WRITER_DATA_ERROR, e);
+    }
+  }
+
+  @Override public void commit() {
+    try {
+      recordWriter.close(false);
+    } catch (Exception ex) {
+      LOG.error("Error while closing the record writer", ex);
+      throw new RuntimeException(ex);
+    }
+    isCommitDone = true;
+  }
+
+  @Override public void rollback() {
+

Review comment:
       should throw UnsupportedOperationException here

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+  private HdfsEnvironment hdfsEnvironment;
+  private SemiTransactionalHiveMetastore metastore;
+  private MapredCarbonOutputCommitter carbonOutputCommitter;
+  private JobContextImpl jobContext;
+
+  public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+      HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone,
+      boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled,
+      boolean createsOfNonManagedTablesEnabled, TypeManager typeManager,
+      LocationService locationService,
+      io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
+      TypeTranslator typeTranslator, String prestoVersion,
+      HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata accessControlMetadata) {
+    super(metastore, hdfsEnvironment, partitionManager, timeZone, allowCorruptWritesForTesting,
+        true, createsOfNonManagedTablesEnabled, typeManager,
+        locationService, partitionUpdateCodec, typeTranslator, prestoVersion,
+        hiveStatisticsProvider, accessControlMetadata);
+    this.hdfsEnvironment = hdfsEnvironment;
+    this.metastore = metastore;
+  }
+
+  @Override
+  public CarbonDataInsertTableHandle beginInsert(ConnectorSession session,
+      ConnectorTableHandle tableHandle) {
+    HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session, tableHandle);
+    SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName();
+    Optional<Table> table =
+        this.metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
+    Path outputPath =
+        new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath());
+    JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment
+        .getConfiguration(
+            new HdfsEnvironment.HdfsContext(session, hiveInsertTableHandle.getSchemaName(),
+                hiveInsertTableHandle.getTableName()),
+            new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath())));
+    jobConf.set("location", outputPath.toString());
+    Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get());
+    try {
+      CarbonLoadModel carbonLoadModel =
+          HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf);
+
+      CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel);
+    } catch (IOException ex) {
+      LOG.error("Error while creating carbon load model", ex);
+      throw new RuntimeException(ex);
+    }
+    try {
+      carbonOutputCommitter = new MapredCarbonOutputCommitter();
+      jobContext = new JobContextImpl(jobConf, new JobID());
+      carbonOutputCommitter.setupJob(jobContext);
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf);
+    } catch (IOException e) {
+      LOG.error("error setting the output committer", e);
+      throw new RuntimeException("error setting the output committer");
+    }
+    return new CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(),
+        hiveInsertTableHandle.getTableName(),
+        hiveInsertTableHandle.getInputColumns(),
+        hiveInsertTableHandle.getPageSinkMetadata(),
+        hiveInsertTableHandle.getLocationHandle(),
+        hiveInsertTableHandle.getBucketProperty(), hiveInsertTableHandle.getTableStorageFormat(),
+        hiveInsertTableHandle.getPartitionStorageFormat(),
+        ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL,
+            jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL)));
+  }
+
+  @Override
+  public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
+      ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments,
+      Collection<ComputedStatistics> computedStatistics) {
+    Optional<ConnectorOutputMetadata> connectorOutputMetadata =
+        super.finishInsert(session, insertHandle, fragments, computedStatistics);
+    try {
+      carbonOutputCommitter.commitJob(jobContext);
+    } catch (IOException e) {
+      LOG.error("Error occurred while committing the insert job.", e);
+      throw new RuntimeException(e);

Review comment:
       If exception is thrown here, we have already called super.finishInsert. So is it better to call super.finishInsert after commitJob?

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(

Review comment:
       move `ImmutableList.copyOf(` to next and make format better

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataInsertTableHandle.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.plugin.hive.HiveBucketProperty;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveStorageFormat;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadata;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataInsertTableHandle extends HiveInsertTableHandle implements
+    ConnectorInsertTableHandle {
+
+  private final Map<String, String> additionalConf;
+
+  @JsonCreator public CarbonDataInsertTableHandle(@JsonProperty("schemaName") String schemaName,

Review comment:
       move first parameter to next line

##########
File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
##########
@@ -214,4 +226,8 @@ private static void setCarbonEnum() throws Exception {
     hiveStorageFormats[src.length] = (HiveStorageFormat) instance;
     values.set(null, hiveStorageFormats);
   }
-}
\ No newline at end of file
+
+  @Override public ConnectorHandleResolver getHandleResolver() {

Review comment:
       move Override to previous line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12345