[GitHub] [carbondata] ajantha-bhat opened a new pull request #3641: [WIP] support prestodb and prestosql

classic Classic list List threaded Threaded
63 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392239358
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return multiBlockSplitList;
+  }
+
+  /**
+   * Returns list of partition specs to query based on the domain constraints
+   *
+   * @param constraints
+   * @param carbonTable
+   * @throws IOException
+   */
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<HiveColumnHandle> constraints,
+      CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+    Set<PartitionSpec> partitionSpecs = new HashSet<>();
+    List<PartitionSpec> prunePartitions = new ArrayList();
 
 Review comment:
   ```suggestion
       List<PartitionSpec> prunePartitions = new ArrayList<>();
   ```
   please check all generic

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392343082
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -688,5 +675,105 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>prestodb</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/prestosql</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-test-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/prestodb</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/prestodb</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>prestosql</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
 
 Review comment:
   yes, intentionally made it. Explained above.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392343610
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -688,5 +675,105 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>prestodb</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
 
 Review comment:
   By default sprak2.3 and prestosql is active. Hence intentionally made 2 active profiles.
   If user needs prestodb he needs to specify the profile, similar to spark2.4

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392542105
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -375,9 +374,9 @@
       <version>2.5</version>
 
 Review comment:
   the group id is not `org.apache.commons` This is `commons-lang`
   Not sure why this dependency needed. It is from base code. Let me check

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392544802
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
 
 Review comment:
   yes, This cache is per query. Instead of reading schema again and again. we infer it once and cache it use for the complete lifecycle of the query

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392545555
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
 
 Review comment:
   yeah, not so efficient. It is from base code. I have to analyze and optimize it later.
   Added a TODO

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549159
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -375,9 +374,9 @@
       <version>2.5</version>
 
 Review comment:
   removed now

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549166
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549172
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return multiBlockSplitList;
+  }
+
+  /**
+   * Returns list of partition specs to query based on the domain constraints
+   *
+   * @param constraints
+   * @param carbonTable
+   * @throws IOException
+   */
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<HiveColumnHandle> constraints,
+      CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+    Set<PartitionSpec> partitionSpecs = new HashSet<>();
+    List<PartitionSpec> prunePartitions = new ArrayList();
+
+    for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+      SegmentFileStore segmentFileStore = null;
+      try {
+        segmentFileStore =
+            new SegmentFileStore(carbonTable.getTablePath(), loadMetadataDetail.getSegmentFile());
+        partitionSpecs.addAll(segmentFileStore.getPartitionSpecs());
+
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549175
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549178
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549179
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549188
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549207
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
 
 Review comment:
   done. removed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549211
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549248
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549270
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
 
 Review comment:
   yeah, not so efficient. It is from base code. I have to analyze and optimize it later.
   Added a TODO

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549275
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392549291
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return multiBlockSplitList;
+  }
+
+  /**
+   * Returns list of partition specs to query based on the domain constraints
+   *
+   * @param constraints
+   * @param carbonTable
+   * @throws IOException
+   */
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<HiveColumnHandle> constraints,
+      CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+    Set<PartitionSpec> partitionSpecs = new HashSet<>();
+    List<PartitionSpec> prunePartitions = new ArrayList();
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#issuecomment-599011078
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2466/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
1234