akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492015787 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { Review comment: `encodedLoadModel` value will be a string, basically the serialized load model, we use this property just to set the encoded load model prepared in `setUpJob()` to conf, so that it will be transferred to all the workers from coordinator. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492017696 ########## File path: integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java ########## @@ -40,6 +40,12 @@ private String endPoint; private String pushRowFilter; + /** + * Property to send load model from coordinator to worker in presto. This is internal constant + * and not exposed to user. Review comment: As said in the above comment, its same we use this as property name to send the load model from coordinator to worker. So its value will be the load model prepared for each load. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492039832 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.google.inject.Inject; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveLocationService; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.plugin.hive.LocationHandle; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.spi.connector.ConnectorSession; +import org.apache.hadoop.fs.Path; + +public class CarbonDataLocationService extends HiveLocationService { + + private final HdfsEnvironment hdfsEnvironment; + + @Inject + public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) { + super(hdfsEnvironment); + this.hdfsEnvironment = hdfsEnvironment; + } + + @Override + public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, + ConnectorSession session, String schemaName, String tableName) { + // TODO: check and make it compatible for cloud scenario Review comment: Actually if we don't override these methods, the presto gives write path as temp path for each writer like we use the temp path in carbon during writing. But this will conflict with our writing part. So i have overridden and given write path and target path as same. In presto super class, for s3 or any encrypted stores, they do not create the temp write path or the staging path. So here basically once we need to test in S3 or OBS and then remove this todo, if it works fine. That is why i added a to do here. Since i didn't have S3/OBS test couldn't do it. I tested in HDFS. You can refer https://github.com/prestosql/presto/blob/8b177120661e600b5595b18826f5c415b7824b81/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java#L55 https://github.com/prestosql/presto/blob/8b177120661e600b5595b18826f5c415b7824b81/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java#L76 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492044263 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.MapredCarbonOutputCommitter; +import org.apache.carbondata.hive.util.HiveCarbonUtil; +import org.apache.carbondata.presto.impl.CarbonTableConfig; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.MetastoreUtil; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.plugin.hive.security.AccessControlMetadata; +import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; +import io.prestosql.plugin.hive.util.ConfigurationUtils; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.statistics.ComputedStatistics; +import io.prestosql.spi.type.TypeManager; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; + +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.log4j.Logger; +import org.joda.time.DateTimeZone; + +public class CarbonDataMetaData extends HiveMetadata { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataMetaData.class.getName()); + + private HdfsEnvironment hdfsEnvironment; + private SemiTransactionalHiveMetastore metastore; + private MapredCarbonOutputCommitter carbonOutputCommitter; + private JobContextImpl jobContext; + + public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone, + boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled, + boolean createsOfNonManagedTablesEnabled, TypeManager typeManager, + LocationService locationService, + io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec, + TypeTranslator typeTranslator, String prestoVersion, + HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata accessControlMetadata) { + super(metastore, hdfsEnvironment, partitionManager, timeZone, allowCorruptWritesForTesting, + true, createsOfNonManagedTablesEnabled, typeManager, + locationService, partitionUpdateCodec, typeTranslator, prestoVersion, + hiveStatisticsProvider, accessControlMetadata); + this.hdfsEnvironment = hdfsEnvironment; + this.metastore = metastore; + } + + @Override + public CarbonDataInsertTableHandle beginInsert(ConnectorSession session, + ConnectorTableHandle tableHandle) { + HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session, tableHandle); + SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName(); + Optional<Table> table = + this.metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); + Path outputPath = + new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath()); + JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment + .getConfiguration( + new HdfsEnvironment.HdfsContext(session, hiveInsertTableHandle.getSchemaName(), + hiveInsertTableHandle.getTableName()), + new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath()))); + jobConf.set("location", outputPath.toString()); + Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get()); + try { + CarbonLoadModel carbonLoadModel = + HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf); + + CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel); + } catch (IOException ex) { + LOG.error("Error while creating carbon load model", ex); + throw new RuntimeException(ex); + } + try { + carbonOutputCommitter = new MapredCarbonOutputCommitter(); + jobContext = new JobContextImpl(jobConf, new JobID()); + carbonOutputCommitter.setupJob(jobContext); + ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf); + } catch (IOException e) { + LOG.error("error setting the output committer", e); + throw new RuntimeException("error setting the output committer"); + } + return new CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(), + hiveInsertTableHandle.getTableName(), + hiveInsertTableHandle.getInputColumns(), + hiveInsertTableHandle.getPageSinkMetadata(), + hiveInsertTableHandle.getLocationHandle(), + hiveInsertTableHandle.getBucketProperty(), hiveInsertTableHandle.getTableStorageFormat(), + hiveInsertTableHandle.getPartitionStorageFormat(), + ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL, + jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL))); + } + + @Override + public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, + ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, + Collection<ComputedStatistics> computedStatistics) { + Optional<ConnectorOutputMetadata> connectorOutputMetadata = + super.finishInsert(session, insertHandle, fragments, computedStatistics); + try { + carbonOutputCommitter.commitJob(jobContext); + } catch (IOException e) { + LOG.error("Error occurred while committing the insert job.", e); + throw new RuntimeException(e); Review comment: Actually super.finishInsert if you see, it doesnt do much for our transactional case. Carbon reading understands the our commit job, basically the status of table status. So here its fine. Also there was another problem which i faced , but since developed almost some months back, i couldn't exactly remember the issue. But with respect to carbon its fine. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492058162 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.MapredCarbonOutputCommitter; +import org.apache.carbondata.hive.util.HiveCarbonUtil; +import org.apache.carbondata.presto.impl.CarbonTableConfig; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.MetastoreUtil; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.plugin.hive.security.AccessControlMetadata; +import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; +import io.prestosql.plugin.hive.util.ConfigurationUtils; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.statistics.ComputedStatistics; +import io.prestosql.spi.type.TypeManager; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; + +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.log4j.Logger; +import org.joda.time.DateTimeZone; + +public class CarbonDataMetaData extends HiveMetadata { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataMetaData.class.getName()); + + private HdfsEnvironment hdfsEnvironment; + private SemiTransactionalHiveMetastore metastore; + private MapredCarbonOutputCommitter carbonOutputCommitter; + private JobContextImpl jobContext; + + public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone, + boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled, Review comment: Actually Super class that those many in its constructor, so followed same , and its being called from one place, so it should be fine? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492058430 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.inject.Inject; +import io.airlift.event.client.EventClient; +import io.airlift.json.JsonCodec; +import io.airlift.units.DataSize; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveConfig; +import io.prestosql.plugin.hive.HiveFileWriterFactory; +import io.prestosql.plugin.hive.HivePageSink; +import io.prestosql.plugin.hive.HivePageSinkProvider; +import io.prestosql.plugin.hive.HiveSessionProperties; +import io.prestosql.plugin.hive.HiveWritableTableHandle; +import io.prestosql.plugin.hive.HiveWriterStats; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.OrcFileWriterFactory; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider; +import io.prestosql.plugin.hive.metastore.SortingColumn; +import io.prestosql.spi.NodeManager; +import io.prestosql.spi.PageIndexerFactory; +import io.prestosql.spi.PageSorter; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorPageSink; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTransactionHandle; +import io.prestosql.spi.type.TypeManager; + +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newFixedThreadPool; + +public class CarbonDataPageSinkProvider extends HivePageSinkProvider { + + private final Set<HiveFileWriterFactory> fileWriterFactories; + private final HdfsEnvironment hdfsEnvironment; + private final PageSorter pageSorter; + private final HiveMetastore metastore; + private final PageIndexerFactory pageIndexerFactory; + private final TypeManager typeManager; + private final int maxOpenPartitions; + private final int maxOpenSortFiles; + private final DataSize writerSortBufferSize; + private final boolean immutablePartitions; + private final LocationService locationService; + private final ListeningExecutorService writeVerificationExecutor; + private final JsonCodec<PartitionUpdate> partitionUpdateCodec; + private final NodeManager nodeManager; + private final EventClient eventClient; + private final HiveSessionProperties hiveSessionProperties; + private final HiveWriterStats hiveWriterStats; + private final OrcFileWriterFactory orcFileWriterFactory; + private final long perTransactionMetastoreCacheMaximumSize; + + @Inject + public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory> fileWriterFactories, + HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore metastore, Review comment: Actually Super class that those many in its constructor, so followed same , and its being called from one place, so it should be fine? and this is Inject framework too. ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +import com.google.inject.Inject; +import io.airlift.concurrent.BoundedExecutor; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; +import io.prestosql.plugin.hive.ForHive; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveConfig; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HiveMetadataFactory; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.NodeVersion; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.CachingHiveMetastore; +import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.security.AccessControlMetadataFactory; +import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider; +import io.prestosql.spi.type.TypeManager; +import org.joda.time.DateTimeZone; + +public class CarbonMetadataFactory extends HiveMetadataFactory { + + private static final Logger log = Logger.get(HiveMetadataFactory.class); + private final boolean allowCorruptWritesForTesting; + private final boolean skipDeletionForAlter; + private final boolean skipTargetCleanupOnRollback; + private final boolean writesToNonManagedTablesEnabled = true; + private final boolean createsOfNonManagedTablesEnabled; + private final long perTransactionCacheMaximumSize; + private final HiveMetastore metastore; + private final HdfsEnvironment hdfsEnvironment; + private final HivePartitionManager partitionManager; + private final DateTimeZone timeZone; + private final TypeManager typeManager; + private final LocationService locationService; + private final BoundedExecutor renameExecution; + private final TypeTranslator typeTranslator; + private final String prestoVersion; + private final AccessControlMetadataFactory accessControlMetadataFactory; + private final JsonCodec partitionUpdateCodec; + + @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, + @ForHive ExecutorService executorService, TypeManager typeManager, + LocationService locationService, JsonCodec<PartitionUpdate> partitionUpdateCodec, + TypeTranslator typeTranslator, NodeVersion nodeVersion, + AccessControlMetadataFactory accessControlMetadataFactory) { + this(metastore, hdfsEnvironment, partitionManager, hiveConfig.getDateTimeZone(), + hiveConfig.getMaxConcurrentFileRenames(), hiveConfig.getAllowCorruptWritesForTesting(), + hiveConfig.isSkipDeletionForAlter(), hiveConfig.isSkipTargetCleanupOnRollback(), + hiveConfig.getWritesToNonManagedTablesEnabled(), + hiveConfig.getCreatesOfNonManagedTablesEnabled(), + hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager, locationService, + partitionUpdateCodec, executorService, typeTranslator, nodeVersion.toString(), + accessControlMetadataFactory); + } + + public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, + HivePartitionManager partitionManager, DateTimeZone timeZone, int maxConcurrentFileRenames, + boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter, Review comment: Actually Super class that those many in its constructor, so followed same , and its being called from one place, so it should be fine? and this is Inject framework too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492062721 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ########## @@ -76,7 +76,7 @@ // TODO Move dictionary generator which is coded in spark to MR framework. public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> { - protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; + public static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492063105 ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -52,25 +53,30 @@ @Override public void setupJob(JobContext jobContext) throws IOException { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); - String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); Random random = new Random(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context = new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID); - CarbonLoadModel carbonLoadModel = - HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); - CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + CarbonLoadModel carbonLoadModel = null; + String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL); + if (encodedString != null) { Review comment: actually its base code refactoring, added comment. @kunal642 please check if the comment is proper or do i need to modify? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492063411 ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -52,25 +53,30 @@ @Override public void setupJob(JobContext jobContext) throws IOException { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); - String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); Random random = new Random(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context = new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID); - CarbonLoadModel carbonLoadModel = - HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); - CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + CarbonLoadModel carbonLoadModel = null; + String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL); + if (encodedString != null) { + carbonLoadModel = + (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString); + } + if (null == carbonLoadModel) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); + String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); + carbonLoadModel = HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); + CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + String loadModelStr = jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL); + jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV, a + ",carbon=" + loadModelStr); Review comment: added comment for the base code. @kunal642 please check whether the comment is proper or not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492063498 ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java ########## @@ -92,6 +95,11 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOEx } String tablePath = FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath(); TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id")); + if (taskAttemptID == null) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492063738 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data Review comment: its page data. changed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492063867 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492064374 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { + if (isCommitDone) { + try { + return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return 0; + } + + @Override public long getSystemMemoryUsage() { + return 0; + } + + @Override public void appendRows(Page dataPage) { + for (int position = 0; position < dataPage.getPositionCount(); position++) { + appendRow(dataPage, position); + } + } + + public void appendRow(Page dataPage, int position) { Review comment: changed ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { + if (isCommitDone) { + try { + return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return 0; + } + + @Override public long getSystemMemoryUsage() { + return 0; + } + + @Override public void appendRows(Page dataPage) { + for (int position = 0; position < dataPage.getPositionCount(); position++) { + appendRow(dataPage, position); + } + } + + public void appendRow(Page dataPage, int position) { + for (int field = 0; field < fieldCount; field++) { + Block block = dataPage.getBlock(field); + if (block.isNull(position)) { + tableInspector.setStructFieldData(row, structFields.get(field), null); + } else { + setters[field].setField(block, position); + } + } + + try { + recordWriter.write(serDe.serialize(row, tableInspector)); + } catch (SerDeException | IOException e) { + throw new PrestoException(HIVE_WRITER_DATA_ERROR, e); + } + } + + @Override public void commit() { + try { + recordWriter.close(false); + } catch (Exception ex) { + LOG.error("Error while closing the record writer", ex); + throw new RuntimeException(ex); + } + isCommitDone = true; + } + + @Override public void rollback() { + Review comment: modified ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r492162320 ########## File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.integrationtest + +import java.io.File +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.presto.server.PrestoServer +import org.apache.carbondata.presto.util.CarbonDataStoreCreator + +class PrestoInsertIntoTableTestCase extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach { + + private val logger = LogServiceFactory + .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName) + + private val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + private val storePath = s"$rootPath/integration/presto/target/store" + private val systemPath = s"$rootPath/integration/presto/target/system" + private val prestoServer = new PrestoServer + + override def beforeAll: Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + "Presto") + val map = new util.HashMap[String, String]() + map.put("hive.metastore", "file") + map.put("hive.metastore.catalog.dir", s"file://$storePath") + map.put("hive.allow-drop-table", "true") + prestoServer.startServer("testdb", map) + prestoServer.execute("drop schema if exists testdb") + prestoServer.execute("create schema testdb") + } + + override protected def beforeEach(): Unit = { + val query = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') " + createTable(query, "testdb", "testtable") + } + + private def createTable(query: String, databaseName: String, tableName: String): Unit = { + prestoServer.execute(s"drop table if exists ${databaseName}.${tableName}") + prestoServer.execute(query) + logger.info("Creating The Carbon Store") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier(databaseName, tableName) + CarbonDataStoreCreator.createTable(absoluteTableIdentifier, true) + logger.info(s"\nCarbon store is created at location: $storePath") + } + + private def getAbsoluteIdentifier(dbName: String, + tableName: String) = { + val absoluteTableIdentifier = AbsoluteTableIdentifier.from( + storePath + "/" + dbName + "/" + tableName, + new CarbonTableIdentifier(dbName, + tableName, + UUID.randomUUID().toString)) + absoluteTableIdentifier + } + + test("test insert with different storage format names") { + val query1 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') " + val query2 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBON') " + val query3 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='ORG.APACHE.CARBONDATA.FORMAT') " + createTable(query1, "testdb", "testtable") + createTable(query2, "testdb", "testtable") + createTable(query3, "testdb", "testtable") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") + assert(FileFactory.getCarbonFile(segmentPath).isFileExist) + } + + test("test insert into one segment and check folder structure") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val tablePath = carbonTable.getTablePath + val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0") + val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1") + val segment0 = FileFactory.getCarbonFile(segment0Path) + assert(segment0.isFileExist) + assert(segment0.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) + } + }).length == 2) + val segment1 = FileFactory.getCarbonFile(segment1Path) + assert(segment1.isFileExist) + assert(segment1.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) + } + }).length == 2) + val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath) + assert(FileFactory.getCarbonFile(segmentsPath).isFileExist && FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2) + val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath) + FileFactory.getCarbonFile(metadataFolderPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE) + } + }) + } + + test("test insert into many segments and check segment count and data count") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath) + assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8) + val actualResult1: List[Map[String, Any]] = prestoServer + .executeQuery("select count(*) AS RESULT from testdb.testtable") + val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4)) + assert(actualResult1.equals(expectedResult1)) + // filter query + val actualResult2: List[Map[String, Any]] = prestoServer + .executeQuery("select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp '1998-12-16 10:12:09'") + val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2)) + assert(actualResult2.equals(expectedResult2)) + } + + test("test if the table status contains the segment file name for each load") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) + ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment => + val loadMetadataDetails = segment.getLoadMetadataDetails + assert(loadMetadataDetails.getSegmentFile != null) + } + } + Review comment: added ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#issuecomment-696264049 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4154/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#issuecomment-696265328 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2413/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
jackylk commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r491843798 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2525,4 +2525,9 @@ private CarbonCommonConstants() { * property which defines the presto query default value */ public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false"; + + /** + * property to send load model from coordinator to worker in presto + */ + public static final String CARBON_PRESTO_LOAD_MODEL = "presto.carbondata.encoded.loadmodel"; Review comment: Is this a configurable item by user? If not, suggest to move to presto integration module only. And I think it is better to change to start with "carbondata", but not "presto" since it is a configuration from carbondata ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2525,4 +2525,9 @@ private CarbonCommonConstants() { * property which defines the presto query default value */ public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false"; + + /** + * property to send load model from coordinator to worker in presto Review comment: Please desicribe in which case user would set it to true? ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -52,25 +53,30 @@ @Override public void setupJob(JobContext jobContext) throws IOException { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); - String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); Random random = new Random(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context = new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID); - CarbonLoadModel carbonLoadModel = - HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); - CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + CarbonLoadModel carbonLoadModel = null; + String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL); + if (encodedString != null) { Review comment: please add comment, why this is needed ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ########## @@ -76,7 +76,7 @@ // TODO Move dictionary generator which is coded in spark to MR framework. public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> { - protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; + public static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; Review comment: please move all public variable together into the beginning of this class ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -52,25 +53,30 @@ @Override public void setupJob(JobContext jobContext) throws IOException { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); - String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); Random random = new Random(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context = new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID); - CarbonLoadModel carbonLoadModel = - HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); - CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + CarbonLoadModel carbonLoadModel = null; + String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL); + if (encodedString != null) { + carbonLoadModel = + (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString); + } + if (null == carbonLoadModel) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); + String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); + carbonLoadModel = HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); + CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + String loadModelStr = jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL); + jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV, a + ",carbon=" + loadModelStr); Review comment: please add comment, what is it doing? ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java ########## @@ -92,6 +95,11 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOEx } String tablePath = FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath(); TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id")); + if (taskAttemptID == null) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); Review comment: please add comment, why is it needed. In what case it will be null? ########## File path: integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java ########## @@ -40,6 +40,12 @@ private String endPoint; private String pushRowFilter; + /** + * Property to send load model from coordinator to worker in presto. This is internal constant + * and not exposed to user. Review comment: describe what are the option values ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { Review comment: why not check encodedLoadModel against true or false? ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); Review comment: do not assign this.outPutPath repeatedly ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { Review comment: move @Override to previous line ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataHandleResolver.java ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import io.prestosql.plugin.hive.HiveHandleResolver; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; + +public class CarbonDataHandleResolver extends HiveHandleResolver { + + @Override public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() { Review comment: move all @Override to previous line ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +import com.google.inject.Inject; +import io.airlift.concurrent.BoundedExecutor; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; +import io.prestosql.plugin.hive.ForHive; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveConfig; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HiveMetadataFactory; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.NodeVersion; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.CachingHiveMetastore; +import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.security.AccessControlMetadataFactory; +import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider; +import io.prestosql.spi.type.TypeManager; +import org.joda.time.DateTimeZone; + +public class CarbonMetadataFactory extends HiveMetadataFactory { + + private static final Logger log = Logger.get(HiveMetadataFactory.class); + private final boolean allowCorruptWritesForTesting; + private final boolean skipDeletionForAlter; + private final boolean skipTargetCleanupOnRollback; + private final boolean writesToNonManagedTablesEnabled = true; + private final boolean createsOfNonManagedTablesEnabled; + private final long perTransactionCacheMaximumSize; + private final HiveMetastore metastore; + private final HdfsEnvironment hdfsEnvironment; + private final HivePartitionManager partitionManager; + private final DateTimeZone timeZone; + private final TypeManager typeManager; + private final LocationService locationService; + private final BoundedExecutor renameExecution; + private final TypeTranslator typeTranslator; + private final String prestoVersion; + private final AccessControlMetadataFactory accessControlMetadataFactory; + private final JsonCodec partitionUpdateCodec; + + @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, + @ForHive ExecutorService executorService, TypeManager typeManager, + LocationService locationService, JsonCodec<PartitionUpdate> partitionUpdateCodec, + TypeTranslator typeTranslator, NodeVersion nodeVersion, + AccessControlMetadataFactory accessControlMetadataFactory) { + this(metastore, hdfsEnvironment, partitionManager, hiveConfig.getDateTimeZone(), + hiveConfig.getMaxConcurrentFileRenames(), hiveConfig.getAllowCorruptWritesForTesting(), + hiveConfig.isSkipDeletionForAlter(), hiveConfig.isSkipTargetCleanupOnRollback(), + hiveConfig.getWritesToNonManagedTablesEnabled(), + hiveConfig.getCreatesOfNonManagedTablesEnabled(), + hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager, locationService, + partitionUpdateCodec, executorService, typeTranslator, nodeVersion.toString(), + accessControlMetadataFactory); + } + + public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, + HivePartitionManager partitionManager, DateTimeZone timeZone, int maxConcurrentFileRenames, + boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter, Review comment: too many paramters ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data Review comment: age data? ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; Review comment: This can be final ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); Review comment: use `this` for all field assignment in the constructor ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.google.inject.Inject; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveLocationService; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.plugin.hive.LocationHandle; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.spi.connector.ConnectorSession; +import org.apache.hadoop.fs.Path; + +public class CarbonDataLocationService extends HiveLocationService { + + private final HdfsEnvironment hdfsEnvironment; + + @Inject + public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) { + super(hdfsEnvironment); + this.hdfsEnvironment = hdfsEnvironment; + } + + @Override + public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, + ConnectorSession session, String schemaName, String tableName) { + // TODO: check and make it compatible for cloud scenario Review comment: What is missing here? ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); Review comment: do not assign this.outPutPath repeatedly ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.MapredCarbonOutputCommitter; +import org.apache.carbondata.hive.util.HiveCarbonUtil; +import org.apache.carbondata.presto.impl.CarbonTableConfig; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.MetastoreUtil; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.plugin.hive.security.AccessControlMetadata; +import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; +import io.prestosql.plugin.hive.util.ConfigurationUtils; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.statistics.ComputedStatistics; +import io.prestosql.spi.type.TypeManager; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; + +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.log4j.Logger; +import org.joda.time.DateTimeZone; + +public class CarbonDataMetaData extends HiveMetadata { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataMetaData.class.getName()); + + private HdfsEnvironment hdfsEnvironment; + private SemiTransactionalHiveMetastore metastore; + private MapredCarbonOutputCommitter carbonOutputCommitter; + private JobContextImpl jobContext; + + public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone, + boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled, Review comment: Too many parameters, should create a parameter object ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import com.google.inject.Inject; +import io.prestosql.plugin.hive.FileFormatDataSourceStats; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveFileWriterFactory; +import io.prestosql.plugin.hive.NodeVersion; +import io.prestosql.plugin.hive.metastore.StorageFormat; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.type.TypeManager; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.mapred.JobConf; + +import static java.util.Objects.requireNonNull; + +public class CarbonDataFileWriterFactory implements HiveFileWriterFactory { + + private final HdfsEnvironment hdfsEnvironment; + private final TypeManager typeManager; + private final NodeVersion nodeVersion; + private final FileFormatDataSourceStats stats; + + @Inject + public CarbonDataFileWriterFactory(HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + NodeVersion nodeVersion, FileFormatDataSourceStats stats) { + this(typeManager, hdfsEnvironment, nodeVersion, stats); + } + + public CarbonDataFileWriterFactory(TypeManager typeManager, HdfsEnvironment hdfsEnvironment, + NodeVersion nodeVersion, + FileFormatDataSourceStats stats) + { Review comment: please make format better for line 52 ~ 55 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.inject.Inject; +import io.airlift.event.client.EventClient; +import io.airlift.json.JsonCodec; +import io.airlift.units.DataSize; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveConfig; +import io.prestosql.plugin.hive.HiveFileWriterFactory; +import io.prestosql.plugin.hive.HivePageSink; +import io.prestosql.plugin.hive.HivePageSinkProvider; +import io.prestosql.plugin.hive.HiveSessionProperties; +import io.prestosql.plugin.hive.HiveWritableTableHandle; +import io.prestosql.plugin.hive.HiveWriterStats; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.OrcFileWriterFactory; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider; +import io.prestosql.plugin.hive.metastore.SortingColumn; +import io.prestosql.spi.NodeManager; +import io.prestosql.spi.PageIndexerFactory; +import io.prestosql.spi.PageSorter; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorPageSink; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTransactionHandle; +import io.prestosql.spi.type.TypeManager; + +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newFixedThreadPool; + +public class CarbonDataPageSinkProvider extends HivePageSinkProvider { + + private final Set<HiveFileWriterFactory> fileWriterFactories; + private final HdfsEnvironment hdfsEnvironment; + private final PageSorter pageSorter; + private final HiveMetastore metastore; + private final PageIndexerFactory pageIndexerFactory; + private final TypeManager typeManager; + private final int maxOpenPartitions; + private final int maxOpenSortFiles; + private final DataSize writerSortBufferSize; + private final boolean immutablePartitions; + private final LocationService locationService; + private final ListeningExecutorService writeVerificationExecutor; + private final JsonCodec<PartitionUpdate> partitionUpdateCodec; + private final NodeManager nodeManager; + private final EventClient eventClient; + private final HiveSessionProperties hiveSessionProperties; + private final HiveWriterStats hiveWriterStats; + private final OrcFileWriterFactory orcFileWriterFactory; + private final long perTransactionMetastoreCacheMaximumSize; + + @Inject + public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory> fileWriterFactories, + HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore metastore, Review comment: Too many parameters, should creata a parameter object ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { + if (isCommitDone) { + try { + return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return 0; + } + + @Override public long getSystemMemoryUsage() { + return 0; + } + + @Override public void appendRows(Page dataPage) { + for (int position = 0; position < dataPage.getPositionCount(); position++) { + appendRow(dataPage, position); + } + } + + public void appendRow(Page dataPage, int position) { Review comment: why is it public? ########## File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.integrationtest + +import java.io.File +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.presto.server.PrestoServer +import org.apache.carbondata.presto.util.CarbonDataStoreCreator + +class PrestoInsertIntoTableTestCase extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach { + + private val logger = LogServiceFactory + .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName) + + private val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + private val storePath = s"$rootPath/integration/presto/target/store" + private val systemPath = s"$rootPath/integration/presto/target/system" + private val prestoServer = new PrestoServer + + override def beforeAll: Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + "Presto") + val map = new util.HashMap[String, String]() + map.put("hive.metastore", "file") + map.put("hive.metastore.catalog.dir", s"file://$storePath") + map.put("hive.allow-drop-table", "true") + prestoServer.startServer("testdb", map) + prestoServer.execute("drop schema if exists testdb") + prestoServer.execute("create schema testdb") + } + + override protected def beforeEach(): Unit = { + val query = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') " + createTable(query, "testdb", "testtable") + } + + private def createTable(query: String, databaseName: String, tableName: String): Unit = { + prestoServer.execute(s"drop table if exists ${databaseName}.${tableName}") + prestoServer.execute(query) + logger.info("Creating The Carbon Store") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier(databaseName, tableName) + CarbonDataStoreCreator.createTable(absoluteTableIdentifier, true) + logger.info(s"\nCarbon store is created at location: $storePath") + } + + private def getAbsoluteIdentifier(dbName: String, + tableName: String) = { + val absoluteTableIdentifier = AbsoluteTableIdentifier.from( + storePath + "/" + dbName + "/" + tableName, + new CarbonTableIdentifier(dbName, + tableName, + UUID.randomUUID().toString)) + absoluteTableIdentifier + } + + test("test insert with different storage format names") { + val query1 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') " + val query2 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBON') " + val query3 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='ORG.APACHE.CARBONDATA.FORMAT') " + createTable(query1, "testdb", "testtable") + createTable(query2, "testdb", "testtable") + createTable(query3, "testdb", "testtable") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") + assert(FileFactory.getCarbonFile(segmentPath).isFileExist) + } + + test("test insert into one segment and check folder structure") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val tablePath = carbonTable.getTablePath + val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0") + val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1") + val segment0 = FileFactory.getCarbonFile(segment0Path) + assert(segment0.isFileExist) + assert(segment0.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) + } + }).length == 2) + val segment1 = FileFactory.getCarbonFile(segment1Path) + assert(segment1.isFileExist) + assert(segment1.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) + } + }).length == 2) + val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath) + assert(FileFactory.getCarbonFile(segmentsPath).isFileExist && FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2) + val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath) + FileFactory.getCarbonFile(metadataFolderPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE) + } + }) + } + + test("test insert into many segments and check segment count and data count") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath) + assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8) + val actualResult1: List[Map[String, Any]] = prestoServer + .executeQuery("select count(*) AS RESULT from testdb.testtable") + val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4)) + assert(actualResult1.equals(expectedResult1)) + // filter query + val actualResult2: List[Map[String, Any]] = prestoServer + .executeQuery("select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp '1998-12-16 10:12:09'") + val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2)) + assert(actualResult2.equals(expectedResult2)) + } + + test("test if the table status contains the segment file name for each load") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) + ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment => + val loadMetadataDetails = segment.getLoadMetadataDetails + assert(loadMetadataDetails.getSegmentFile != null) + } + } + Review comment: To test transactional write, you can add a testcase to query the table while inserting data. In that case, query result should not change before insertion success ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { + if (isCommitDone) { + try { + return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return 0; + } + + @Override public long getSystemMemoryUsage() { + return 0; + } + + @Override public void appendRows(Page dataPage) { + for (int position = 0; position < dataPage.getPositionCount(); position++) { + appendRow(dataPage, position); + } + } + + public void appendRow(Page dataPage, int position) { + for (int field = 0; field < fieldCount; field++) { + Block block = dataPage.getBlock(field); + if (block.isNull(position)) { + tableInspector.setStructFieldData(row, structFields.get(field), null); + } else { + setters[field].setField(block, position); + } + } + + try { + recordWriter.write(serDe.serialize(row, tableInspector)); + } catch (SerDeException | IOException e) { + throw new PrestoException(HIVE_WRITER_DATA_ERROR, e); + } + } + + @Override public void commit() { + try { + recordWriter.close(false); + } catch (Exception ex) { + LOG.error("Error while closing the record writer", ex); + throw new RuntimeException(ex); + } + isCommitDone = true; + } + + @Override public void rollback() { + Review comment: should throw UnsupportedOperationException here ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.MapredCarbonOutputCommitter; +import org.apache.carbondata.hive.util.HiveCarbonUtil; +import org.apache.carbondata.presto.impl.CarbonTableConfig; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.MetastoreUtil; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.plugin.hive.security.AccessControlMetadata; +import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; +import io.prestosql.plugin.hive.util.ConfigurationUtils; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.statistics.ComputedStatistics; +import io.prestosql.spi.type.TypeManager; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; + +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.log4j.Logger; +import org.joda.time.DateTimeZone; + +public class CarbonDataMetaData extends HiveMetadata { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataMetaData.class.getName()); + + private HdfsEnvironment hdfsEnvironment; + private SemiTransactionalHiveMetastore metastore; + private MapredCarbonOutputCommitter carbonOutputCommitter; + private JobContextImpl jobContext; + + public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone, + boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled, + boolean createsOfNonManagedTablesEnabled, TypeManager typeManager, + LocationService locationService, + io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec, + TypeTranslator typeTranslator, String prestoVersion, + HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata accessControlMetadata) { + super(metastore, hdfsEnvironment, partitionManager, timeZone, allowCorruptWritesForTesting, + true, createsOfNonManagedTablesEnabled, typeManager, + locationService, partitionUpdateCodec, typeTranslator, prestoVersion, + hiveStatisticsProvider, accessControlMetadata); + this.hdfsEnvironment = hdfsEnvironment; + this.metastore = metastore; + } + + @Override + public CarbonDataInsertTableHandle beginInsert(ConnectorSession session, + ConnectorTableHandle tableHandle) { + HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session, tableHandle); + SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName(); + Optional<Table> table = + this.metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); + Path outputPath = + new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath()); + JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment + .getConfiguration( + new HdfsEnvironment.HdfsContext(session, hiveInsertTableHandle.getSchemaName(), + hiveInsertTableHandle.getTableName()), + new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath()))); + jobConf.set("location", outputPath.toString()); + Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get()); + try { + CarbonLoadModel carbonLoadModel = + HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf); + + CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel); + } catch (IOException ex) { + LOG.error("Error while creating carbon load model", ex); + throw new RuntimeException(ex); + } + try { + carbonOutputCommitter = new MapredCarbonOutputCommitter(); + jobContext = new JobContextImpl(jobConf, new JobID()); + carbonOutputCommitter.setupJob(jobContext); + ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf); + } catch (IOException e) { + LOG.error("error setting the output committer", e); + throw new RuntimeException("error setting the output committer"); + } + return new CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(), + hiveInsertTableHandle.getTableName(), + hiveInsertTableHandle.getInputColumns(), + hiveInsertTableHandle.getPageSinkMetadata(), + hiveInsertTableHandle.getLocationHandle(), + hiveInsertTableHandle.getBucketProperty(), hiveInsertTableHandle.getTableStorageFormat(), + hiveInsertTableHandle.getPartitionStorageFormat(), + ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL, + jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL))); + } + + @Override + public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, + ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, + Collection<ComputedStatistics> computedStatistics) { + Optional<ConnectorOutputMetadata> connectorOutputMetadata = + super.finishInsert(session, insertHandle, fragments, computedStatistics); + try { + carbonOutputCommitter.commitJob(jobContext); + } catch (IOException e) { + LOG.error("Error occurred while committing the insert job.", e); + throw new RuntimeException(e); Review comment: If exception is thrown here, we have already called super.finishInsert. So is it better to call super.finishInsert after commitJob? ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( Review comment: move `ImmutableList.copyOf(` to next and make format better ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataInsertTableHandle.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.presto; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import io.prestosql.plugin.hive.HiveBucketProperty; +import io.prestosql.plugin.hive.HiveColumnHandle; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveStorageFormat; +import io.prestosql.plugin.hive.LocationHandle; +import io.prestosql.plugin.hive.metastore.HivePageSinkMetadata; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; + +import static java.util.Objects.requireNonNull; + +public class CarbonDataInsertTableHandle extends HiveInsertTableHandle implements + ConnectorInsertTableHandle { + + private final Map<String, String> additionalConf; + + @JsonCreator public CarbonDataInsertTableHandle(@JsonProperty("schemaName") String schemaName, Review comment: move first parameter to next line ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java ########## @@ -214,4 +226,8 @@ private static void setCarbonEnum() throws Exception { hiveStorageFormats[src.length] = (HiveStorageFormat) instance; values.set(null, hiveStorageFormats); } -} \ No newline at end of file + + @Override public ConnectorHandleResolver getHandleResolver() { Review comment: move Override to previous line ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#discussion_r491866753 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2525,4 +2525,9 @@ private CarbonCommonConstants() { * property which defines the presto query default value */ public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false"; + + /** + * property to send load model from coordinator to worker in presto + */ + public static final String CARBON_PRESTO_LOAD_MODEL = "presto.carbondata.encoded.loadmodel"; Review comment: its not configurable by user. You are right. I have moved the constant to `CarbonTableConfig` in presto module and renamed it to `carbondata.presto.encoded.loadmodel` ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2525,4 +2525,9 @@ private CarbonCommonConstants() { * property which defines the presto query default value */ public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false"; + + /** + * property to send load model from coordinator to worker in presto Review comment: not a user config, same as above comment. Moved to presto module. ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { Review comment: `encodedLoadModel` value will be a string, basically the serialized load model, we use this property just to set the encoded load model prepared in `setUpJob()` to conf, so that it will be transferred to all the workers from coordinator. ########## File path: integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java ########## @@ -40,6 +40,12 @@ private String endPoint; private String pushRowFilter; + /** + * Property to send load model from coordinator to worker in presto. This is internal constant + * and not exposed to user. Review comment: As said in the above comment, its same we use this as property name to send the load model from coordinator to worker. So its value will be the load model prepared for each load. ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.google.inject.Inject; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveLocationService; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.plugin.hive.LocationHandle; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.spi.connector.ConnectorSession; +import org.apache.hadoop.fs.Path; + +public class CarbonDataLocationService extends HiveLocationService { + + private final HdfsEnvironment hdfsEnvironment; + + @Inject + public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) { + super(hdfsEnvironment); + this.hdfsEnvironment = hdfsEnvironment; + } + + @Override + public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, + ConnectorSession session, String schemaName, String tableName) { + // TODO: check and make it compatible for cloud scenario Review comment: Actually if we don't override these methods, the presto gives write path as temp path for each writer like we use the temp path in carbon during writing. But this will conflict with our writing part. So i have overridden and given write path and target path as same. In presto super class, for s3 or any encrypted stores, they do not create the temp write path or the staging path. So here basically once we need to test in S3 or OBS and then remove this todo, if it works fine. That is why i added a to do here. Since i didn't have S3/OBS test couldn't do it. I tested in HDFS. You can refer https://github.com/prestosql/presto/blob/8b177120661e600b5595b18826f5c415b7824b81/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java#L55 https://github.com/prestosql/presto/blob/8b177120661e600b5595b18826f5c415b7824b81/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java#L76 ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.MapredCarbonOutputCommitter; +import org.apache.carbondata.hive.util.HiveCarbonUtil; +import org.apache.carbondata.presto.impl.CarbonTableConfig; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.MetastoreUtil; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.plugin.hive.security.AccessControlMetadata; +import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; +import io.prestosql.plugin.hive.util.ConfigurationUtils; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.statistics.ComputedStatistics; +import io.prestosql.spi.type.TypeManager; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; + +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.log4j.Logger; +import org.joda.time.DateTimeZone; + +public class CarbonDataMetaData extends HiveMetadata { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataMetaData.class.getName()); + + private HdfsEnvironment hdfsEnvironment; + private SemiTransactionalHiveMetastore metastore; + private MapredCarbonOutputCommitter carbonOutputCommitter; + private JobContextImpl jobContext; + + public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone, + boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled, + boolean createsOfNonManagedTablesEnabled, TypeManager typeManager, + LocationService locationService, + io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec, + TypeTranslator typeTranslator, String prestoVersion, + HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata accessControlMetadata) { + super(metastore, hdfsEnvironment, partitionManager, timeZone, allowCorruptWritesForTesting, + true, createsOfNonManagedTablesEnabled, typeManager, + locationService, partitionUpdateCodec, typeTranslator, prestoVersion, + hiveStatisticsProvider, accessControlMetadata); + this.hdfsEnvironment = hdfsEnvironment; + this.metastore = metastore; + } + + @Override + public CarbonDataInsertTableHandle beginInsert(ConnectorSession session, + ConnectorTableHandle tableHandle) { + HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session, tableHandle); + SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName(); + Optional<Table> table = + this.metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); + Path outputPath = + new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath()); + JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment + .getConfiguration( + new HdfsEnvironment.HdfsContext(session, hiveInsertTableHandle.getSchemaName(), + hiveInsertTableHandle.getTableName()), + new Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath()))); + jobConf.set("location", outputPath.toString()); + Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get()); + try { + CarbonLoadModel carbonLoadModel = + HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf); + + CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel); + } catch (IOException ex) { + LOG.error("Error while creating carbon load model", ex); + throw new RuntimeException(ex); + } + try { + carbonOutputCommitter = new MapredCarbonOutputCommitter(); + jobContext = new JobContextImpl(jobConf, new JobID()); + carbonOutputCommitter.setupJob(jobContext); + ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf); + } catch (IOException e) { + LOG.error("error setting the output committer", e); + throw new RuntimeException("error setting the output committer"); + } + return new CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(), + hiveInsertTableHandle.getTableName(), + hiveInsertTableHandle.getInputColumns(), + hiveInsertTableHandle.getPageSinkMetadata(), + hiveInsertTableHandle.getLocationHandle(), + hiveInsertTableHandle.getBucketProperty(), hiveInsertTableHandle.getTableStorageFormat(), + hiveInsertTableHandle.getPartitionStorageFormat(), + ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL, + jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL))); + } + + @Override + public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, + ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, + Collection<ComputedStatistics> computedStatistics) { + Optional<ConnectorOutputMetadata> connectorOutputMetadata = + super.finishInsert(session, insertHandle, fragments, computedStatistics); + try { + carbonOutputCommitter.commitJob(jobContext); + } catch (IOException e) { + LOG.error("Error occurred while committing the insert job.", e); + throw new RuntimeException(e); Review comment: Actually super.finishInsert if you see, it doesnt do much for our transactional case. Carbon reading understands the our commit job, basically the status of table status. So here its fine. Also there was another problem which i faced , but since developed almost some months back, i couldn't exactly remember the issue. But with respect to carbon its fine. ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.MapredCarbonOutputCommitter; +import org.apache.carbondata.hive.util.HiveCarbonUtil; +import org.apache.carbondata.presto.impl.CarbonTableConfig; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveInsertTableHandle; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.MetastoreUtil; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.metastore.Table; +import io.prestosql.plugin.hive.security.AccessControlMetadata; +import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; +import io.prestosql.plugin.hive.util.ConfigurationUtils; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.statistics.ComputedStatistics; +import io.prestosql.spi.type.TypeManager; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; + +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.log4j.Logger; +import org.joda.time.DateTimeZone; + +public class CarbonDataMetaData extends HiveMetadata { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataMetaData.class.getName()); + + private HdfsEnvironment hdfsEnvironment; + private SemiTransactionalHiveMetastore metastore; + private MapredCarbonOutputCommitter carbonOutputCommitter; + private JobContextImpl jobContext; + + public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, DateTimeZone timeZone, + boolean allowCorruptWritesForTesting, boolean writesToNonManagedTablesEnabled, Review comment: Actually Super class that those many in its constructor, so followed same , and its being called from one place, so it should be fine? ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.inject.Inject; +import io.airlift.event.client.EventClient; +import io.airlift.json.JsonCodec; +import io.airlift.units.DataSize; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveConfig; +import io.prestosql.plugin.hive.HiveFileWriterFactory; +import io.prestosql.plugin.hive.HivePageSink; +import io.prestosql.plugin.hive.HivePageSinkProvider; +import io.prestosql.plugin.hive.HiveSessionProperties; +import io.prestosql.plugin.hive.HiveWritableTableHandle; +import io.prestosql.plugin.hive.HiveWriterStats; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.OrcFileWriterFactory; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider; +import io.prestosql.plugin.hive.metastore.SortingColumn; +import io.prestosql.spi.NodeManager; +import io.prestosql.spi.PageIndexerFactory; +import io.prestosql.spi.PageSorter; +import io.prestosql.spi.connector.ConnectorInsertTableHandle; +import io.prestosql.spi.connector.ConnectorPageSink; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTransactionHandle; +import io.prestosql.spi.type.TypeManager; + +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newFixedThreadPool; + +public class CarbonDataPageSinkProvider extends HivePageSinkProvider { + + private final Set<HiveFileWriterFactory> fileWriterFactories; + private final HdfsEnvironment hdfsEnvironment; + private final PageSorter pageSorter; + private final HiveMetastore metastore; + private final PageIndexerFactory pageIndexerFactory; + private final TypeManager typeManager; + private final int maxOpenPartitions; + private final int maxOpenSortFiles; + private final DataSize writerSortBufferSize; + private final boolean immutablePartitions; + private final LocationService locationService; + private final ListeningExecutorService writeVerificationExecutor; + private final JsonCodec<PartitionUpdate> partitionUpdateCodec; + private final NodeManager nodeManager; + private final EventClient eventClient; + private final HiveSessionProperties hiveSessionProperties; + private final HiveWriterStats hiveWriterStats; + private final OrcFileWriterFactory orcFileWriterFactory; + private final long perTransactionMetastoreCacheMaximumSize; + + @Inject + public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory> fileWriterFactories, + HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore metastore, Review comment: Actually Super class that those many in its constructor, so followed same , and its being called from one place, so it should be fine? and this is Inject framework too. ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +import com.google.inject.Inject; +import io.airlift.concurrent.BoundedExecutor; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; +import io.prestosql.plugin.hive.ForHive; +import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.plugin.hive.HiveConfig; +import io.prestosql.plugin.hive.HiveMetadata; +import io.prestosql.plugin.hive.HiveMetadataFactory; +import io.prestosql.plugin.hive.HivePartitionManager; +import io.prestosql.plugin.hive.LocationService; +import io.prestosql.plugin.hive.NodeVersion; +import io.prestosql.plugin.hive.PartitionUpdate; +import io.prestosql.plugin.hive.TypeTranslator; +import io.prestosql.plugin.hive.metastore.CachingHiveMetastore; +import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.prestosql.plugin.hive.security.AccessControlMetadataFactory; +import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider; +import io.prestosql.spi.type.TypeManager; +import org.joda.time.DateTimeZone; + +public class CarbonMetadataFactory extends HiveMetadataFactory { + + private static final Logger log = Logger.get(HiveMetadataFactory.class); + private final boolean allowCorruptWritesForTesting; + private final boolean skipDeletionForAlter; + private final boolean skipTargetCleanupOnRollback; + private final boolean writesToNonManagedTablesEnabled = true; + private final boolean createsOfNonManagedTablesEnabled; + private final long perTransactionCacheMaximumSize; + private final HiveMetastore metastore; + private final HdfsEnvironment hdfsEnvironment; + private final HivePartitionManager partitionManager; + private final DateTimeZone timeZone; + private final TypeManager typeManager; + private final LocationService locationService; + private final BoundedExecutor renameExecution; + private final TypeTranslator typeTranslator; + private final String prestoVersion; + private final AccessControlMetadataFactory accessControlMetadataFactory; + private final JsonCodec partitionUpdateCodec; + + @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, + @ForHive ExecutorService executorService, TypeManager typeManager, + LocationService locationService, JsonCodec<PartitionUpdate> partitionUpdateCodec, + TypeTranslator typeTranslator, NodeVersion nodeVersion, + AccessControlMetadataFactory accessControlMetadataFactory) { + this(metastore, hdfsEnvironment, partitionManager, hiveConfig.getDateTimeZone(), + hiveConfig.getMaxConcurrentFileRenames(), hiveConfig.getAllowCorruptWritesForTesting(), + hiveConfig.isSkipDeletionForAlter(), hiveConfig.isSkipTargetCleanupOnRollback(), + hiveConfig.getWritesToNonManagedTablesEnabled(), + hiveConfig.getCreatesOfNonManagedTablesEnabled(), + hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager, locationService, + partitionUpdateCodec, executorService, typeTranslator, nodeVersion.toString(), + accessControlMetadataFactory); + } + + public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, + HivePartitionManager partitionManager, DateTimeZone timeZone, int maxConcurrentFileRenames, + boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter, Review comment: Actually Super class that those many in its constructor, so followed same , and its being called from one place, so it should be fine? and this is Inject framework too. ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ########## @@ -76,7 +76,7 @@ // TODO Move dictionary generator which is coded in spark to MR framework. public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> { - protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; + public static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; Review comment: done ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -52,25 +53,30 @@ @Override public void setupJob(JobContext jobContext) throws IOException { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); - String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); Random random = new Random(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context = new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID); - CarbonLoadModel carbonLoadModel = - HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); - CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + CarbonLoadModel carbonLoadModel = null; + String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL); + if (encodedString != null) { Review comment: actually its base code refactoring, added comment. @kunal642 please check if the comment is proper or do i need to modify? ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -52,25 +53,30 @@ @Override public void setupJob(JobContext jobContext) throws IOException { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); - String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); Random random = new Random(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context = new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID); - CarbonLoadModel carbonLoadModel = - HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); - CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + CarbonLoadModel carbonLoadModel = null; + String encodedString = jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL); + if (encodedString != null) { + carbonLoadModel = + (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString); + } + if (null == carbonLoadModel) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration()); + String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV); + carbonLoadModel = HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration()); + CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel); + String loadModelStr = jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL); + jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV, a + ",carbon=" + loadModelStr); Review comment: added comment for the base code. @kunal642 please check whether the comment is proper or not. ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java ########## @@ -92,6 +95,11 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOEx } String tablePath = FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath(); TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id")); + if (taskAttemptID == null) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data Review comment: its page data. changed ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( Review comment: done ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { + if (isCommitDone) { + try { + return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return 0; + } + + @Override public long getSystemMemoryUsage() { + return 0; + } + + @Override public void appendRows(Page dataPage) { + for (int position = 0; position < dataPage.getPositionCount(); position++) { + appendRow(dataPage, position); + } + } + + public void appendRow(Page dataPage, int position) { Review comment: changed ########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hive.CarbonHiveSerDe; +import org.apache.carbondata.hive.MapredCarbonOutputFormat; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.hive.HiveFileWriter; +import io.prestosql.plugin.hive.HiveType; +import io.prestosql.plugin.hive.HiveWriteUtils; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; + +/** + * This class implements HiveFileWriter and it creates the carbonFileWriter to write the age data + * sent from presto. + */ +public class CarbonDataFileWriter implements HiveFileWriter { + + private static final Logger LOG = + LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName()); + + private final JobConf configuration; + private Path outPutPath; + private final FileSinkOperator.RecordWriter recordWriter; + private final CarbonHiveSerDe serDe; + private final int fieldCount; + private final Object row; + private final SettableStructObjectInspector tableInspector; + private final List<StructField> structFields; + private final HiveWriteUtils.FieldSetter[] setters; + + private boolean isCommitDone; + + public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties, + JobConf configuration, TypeManager typeManager) throws SerDeException { + this.outPutPath = requireNonNull(outPutPath, "path is null"); + this.outPutPath = new Path(properties.getProperty("location")); + outPutPath = new Path(properties.getProperty("location")); + this.configuration = requireNonNull(configuration, "conf is null"); + List<String> columnNames = Arrays + .asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA)); + List<Type> fileColumnTypes = + HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, "")).stream() + .map(hiveType -> hiveType.getType(typeManager)).collect(toList()); + fieldCount = columnNames.size(); + serDe = new CarbonHiveSerDe(); + serDe.initialize(configuration, properties); + tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector(); + + structFields = ImmutableList.copyOf( + inputColumnNames.stream().map(tableInspector::getStructFieldRef) + .collect(toImmutableList())); + + row = tableInspector.create(); + + setters = new HiveWriteUtils.FieldSetter[structFields.size()]; + for (int i = 0; i < setters.length; i++) { + setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, structFields.get(i), + fileColumnTypes.get(structFields.get(i).getFieldID())); + } + String encodedLoadModel = configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL); + if (StringUtils.isNotEmpty(encodedLoadModel)) { + configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel); + } + try { + boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT); + Object writer = + Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance(); + recordWriter = ((MapredCarbonOutputFormat<?>) writer) + .getHiveRecordWriter(this.configuration, outPutPath, Text.class, compress, + properties, Reporter.NULL); + } catch (Exception e) { + LOG.error("error while initializing writer", e); + throw new RuntimeException("writer class not found"); + } + } + + @Override public long getWrittenBytes() { + if (isCommitDone) { + try { + return outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return 0; + } + + @Override public long getSystemMemoryUsage() { + return 0; + } + + @Override public void appendRows(Page dataPage) { + for (int position = 0; position < dataPage.getPositionCount(); position++) { + appendRow(dataPage, position); + } + } + + public void appendRow(Page dataPage, int position) { + for (int field = 0; field < fieldCount; field++) { + Block block = dataPage.getBlock(field); + if (block.isNull(position)) { + tableInspector.setStructFieldData(row, structFields.get(field), null); + } else { + setters[field].setField(block, position); + } + } + + try { + recordWriter.write(serDe.serialize(row, tableInspector)); + } catch (SerDeException | IOException e) { + throw new PrestoException(HIVE_WRITER_DATA_ERROR, e); + } + } + + @Override public void commit() { + try { + recordWriter.close(false); + } catch (Exception ex) { + LOG.error("Error while closing the record writer", ex); + throw new RuntimeException(ex); + } + isCommitDone = true; + } + + @Override public void rollback() { + Review comment: modified ########## File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.integrationtest + +import java.io.File +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.presto.server.PrestoServer +import org.apache.carbondata.presto.util.CarbonDataStoreCreator + +class PrestoInsertIntoTableTestCase extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach { + + private val logger = LogServiceFactory + .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName) + + private val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + private val storePath = s"$rootPath/integration/presto/target/store" + private val systemPath = s"$rootPath/integration/presto/target/system" + private val prestoServer = new PrestoServer + + override def beforeAll: Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + "Presto") + val map = new util.HashMap[String, String]() + map.put("hive.metastore", "file") + map.put("hive.metastore.catalog.dir", s"file://$storePath") + map.put("hive.allow-drop-table", "true") + prestoServer.startServer("testdb", map) + prestoServer.execute("drop schema if exists testdb") + prestoServer.execute("create schema testdb") + } + + override protected def beforeEach(): Unit = { + val query = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') " + createTable(query, "testdb", "testtable") + } + + private def createTable(query: String, databaseName: String, tableName: String): Unit = { + prestoServer.execute(s"drop table if exists ${databaseName}.${tableName}") + prestoServer.execute(query) + logger.info("Creating The Carbon Store") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier(databaseName, tableName) + CarbonDataStoreCreator.createTable(absoluteTableIdentifier, true) + logger.info(s"\nCarbon store is created at location: $storePath") + } + + private def getAbsoluteIdentifier(dbName: String, + tableName: String) = { + val absoluteTableIdentifier = AbsoluteTableIdentifier.from( + storePath + "/" + dbName + "/" + tableName, + new CarbonTableIdentifier(dbName, + tableName, + UUID.randomUUID().toString)) + absoluteTableIdentifier + } + + test("test insert with different storage format names") { + val query1 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') " + val query2 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBON') " + val query3 = "create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='ORG.APACHE.CARBONDATA.FORMAT') " + createTable(query1, "testdb", "testtable") + createTable(query2, "testdb", "testtable") + createTable(query3, "testdb", "testtable") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") + assert(FileFactory.getCarbonFile(segmentPath).isFileExist) + } + + test("test insert into one segment and check folder structure") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val tablePath = carbonTable.getTablePath + val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0") + val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1") + val segment0 = FileFactory.getCarbonFile(segment0Path) + assert(segment0.isFileExist) + assert(segment0.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) + } + }).length == 2) + val segment1 = FileFactory.getCarbonFile(segment1Path) + assert(segment1.isFileExist) + assert(segment1.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) + } + }).length == 2) + val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath) + assert(FileFactory.getCarbonFile(segmentsPath).isFileExist && FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2) + val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath) + FileFactory.getCarbonFile(metadataFolderPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE) + } + }) + } + + test("test insert into many segments and check segment count and data count") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath) + assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8) + val actualResult1: List[Map[String, Any]] = prestoServer + .executeQuery("select count(*) AS RESULT from testdb.testtable") + val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4)) + assert(actualResult1.equals(expectedResult1)) + // filter query + val actualResult2: List[Map[String, Any]] = prestoServer + .executeQuery("select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp '1998-12-16 10:12:09'") + val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2)) + assert(actualResult2.equals(expectedResult2)) + } + + test("test if the table status contains the segment file name for each load") { + prestoServer.execute("insert into testdb.testtable values(10, current_date, 'INDIA', 'Chandler', 'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint '23', true)") + val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb", "testtable") + val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier) + val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) + ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment => + val loadMetadataDetails = segment.getLoadMetadataDetails + assert(loadMetadataDetails.getSegmentFile != null) + } + } + Review comment: added ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#issuecomment-696026596 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3875: URL: https://github.com/apache/carbondata/pull/3875#issuecomment-704138259 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4312/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |