[GitHub] [carbondata] ajantha-bhat opened a new pull request #3641: [WIP] support prestodb and prestosql

classic Classic list List threaded Threaded
63 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392227643
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -688,5 +675,105 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>prestodb</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
 
 Review comment:
   Seems there are two `activeByDefault`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392227716
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -688,5 +675,105 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>prestodb</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/prestosql</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-test-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/prestodb</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/prestodb</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>prestosql</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
 
 Review comment:
   Seems there are two `activeByDefault`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392229354
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -688,5 +675,105 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>prestodb</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
 
 Review comment:
   Now user need to select profile when building for presto integration, so CI only will test for prestosql integration, right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392230642
 
 

 ##########
 File path: integration/presto/pom.xml
 ##########
 @@ -375,9 +374,9 @@
       <version>2.5</version>
 
 Review comment:
   Shouldn't it be commons-lang3 version 3.5? In other places it depends on that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392232492
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
 
 Review comment:
   please rename it properly and add comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392233008
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return multiBlockSplitList;
+  }
+
+  /**
+   * Returns list of partition specs to query based on the domain constraints
+   *
+   * @param constraints
+   * @param carbonTable
+   * @throws IOException
+   */
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<HiveColumnHandle> constraints,
+      CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+    Set<PartitionSpec> partitionSpecs = new HashSet<>();
+    List<PartitionSpec> prunePartitions = new ArrayList();
+
+    for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+      SegmentFileStore segmentFileStore = null;
+      try {
+        segmentFileStore =
+            new SegmentFileStore(carbonTable.getTablePath(), loadMetadataDetail.getSegmentFile());
+        partitionSpecs.addAll(segmentFileStore.getPartitionSpecs());
+
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
 
 Review comment:
   I think it is ok to skip the logging, so without try block

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392233839
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
 
 Review comment:
   add new line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392233873
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
 
 Review comment:
   add new line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392234002
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
 
 Review comment:
   please add comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392234136
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
 
 Review comment:
    add new line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392234830
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
 
 Review comment:
   Can this handle case of reading after ALTER TABLE ADD/DROP COLUMN in spark?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392235380
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
+                    .toArray(String[]::new)));
+          }
+        }
+        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
+
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
 
 Review comment:
   why this is required? seems not required

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392236030
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
 
 Review comment:
   ```suggestion
         CarbonTableInputFormat<Object> carbonTableInputFormat =
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392236215
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
 
 Review comment:
   modify line362 also

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392236725
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
 
 Review comment:
   This is also true (indicated by IDEAS)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392237708
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
 
 Review comment:
   ```suggestion
   result.stream().collect(Collectors.groupingBy(
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392237708
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
 
 Review comment:
   ```suggestion
               result.stream().collect(Collectors.groupingBy(
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392238084
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
 
 Review comment:
   ```suggestion
           List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList<>(
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392238602
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
 
 Review comment:
   This is not efficient if there are a lot of splits

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql
URL: https://github.com/apache/carbondata/pull/3641#discussion_r392239003
 
 

 ##########
 File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
 ##########
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
+/**
+ * CarbonTableReader will be a facade of these utils
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ * Currently, it is mainly used to parse metadata of tables under
+ * the configured carbondata-store path and filter the relevant
+ * input splits with given query predicates.
+ */
+public class CarbonTableReader {
+
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+  public CarbonTableConfig config;
+  /**
+   * A cache for Carbon reader, with this cache,
+   * metadata of a table is only read from file system once.
+   */
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+
+  private String queryId;
+
+  /**
+   * Logger instance
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+  /**
+   * List Of Schemas
+   */
+  private List<String> schemaNames = new ArrayList<>();
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
+    populateCarbonProperties();
+  }
+
+  /**
+   * For presto worker node to initialize the metadata cache of a table.
+   *
+   * @param table the name of the table and schema.
+   * @return
+   */
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
+    }
+    return carbonTableCacheModel;
+  }
+
+  /**
+   * Find all the tables under the schema store path (this.carbonFileList)
+   * and cache all the table names in this.tableList. Notice that whenever this method
+   * is called, it clears this.tableList and populate the list by reading the files.
+   */
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
+      }
+    }
+  }
+
+  /**
+   * Read the metadata of the given table
+   * and cache it in this.carbonCache (CarbonTableReader cache).
+   *
+   * @param table name of the given table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
+   */
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
+    try {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
+        return cache;
+      }
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
+  public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
+    config.set("query.id", queryId);
+    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
+    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
+
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
+    LoadMetadataDetails[] loadMetadataDetails = null;
+    if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
+            CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        throw e;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails);
+    }
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), filteredPartitions);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
+        }
+
+        // Use block distribution
+        List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList(
+            result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy(
+                carbonInput -> {
+                  if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
+                    return carbonInput.getSegmentId().concat(carbonInput.getPath())
+                      .concat(carbonInput.getStart() + "");
+                  }
+                  return carbonInput.getSegmentId().concat(carbonInput.getPath());
+                })).values());
+        if (inputSplits != null) {
+          for (int j = 0; j < inputSplits.size(); j++) {
+            multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
+                inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
 
 Review comment:
   Is there more efficient way in case of millions of splits

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
1234