GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/1099 [CARBONDATA-1232] Datamap implementation for Blocklet Bypassed btree implementations and used datamap to get the pruned blocklets. Only driver side datamap is maintained and all required information will be send to executor. No datamap/btree will be maintained in executor. Implementation classes are org.apache.carbondata.core.indexstore.blockletindex.BlockletTableMap org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap Here only used unsafe storage to store data. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata blocklet-datamap Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1099.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1099 ---- commit 7cf48d9a9b8bd7814c90c61172e06295fd3a1064 Author: ravipesala <[hidden email]> Date: 2017-06-17T17:23:57Z Added blocklet index implementation in datamap commit 0216583315232ddb80ada23451793f7e025e33a0 Author: ravipesala <[hidden email]> Date: 2017-06-22T04:49:52Z Blocklet implementation for datamap commit 579a9ecea16ab9f837605543e1991f34e20d4420 Author: ravipesala <[hidden email]> Date: 2017-06-25T11:45:49Z Added LRU cache to blocklet data map. commit b9983cc6c01b4950d9ab06561ca16ac558fca886 Author: ravipesala <[hidden email]> Date: 2017-06-26T14:31:35Z Fixed update test fail ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1099 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/2721/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1099 Build Success with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/146/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit commented on the issue:
https://github.com/apache/carbondata/pull/1099 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/carbondata-pr-spark-1.6/648/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125539784 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/AbstractTableDataMap.java --- @@ -26,28 +27,29 @@ * DataMap at the table level, user can add any number of datamaps for one table. Depends * on the filter condition it can prune the blocklets. */ -public interface TableDataMap extends EventListener { +public abstract class AbstractTableDataMap implements EventListener { --- End diff -- how about to name it `TableDataMap` and rename `DataMap` to `SegmentDataMap` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125539854 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java --- @@ -30,7 +31,7 @@ --- End diff -- please modify the comment, not index table --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125539929 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java --- @@ -92,9 +93,9 @@ public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataM } public void clearDataMap(String dataMapName, DataMapType mapType) { --- End diff -- should invode clearDataMap when dropping table --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125540172 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java --- @@ -69,20 +70,20 @@ public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMa * @param mapType * @return */ - public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType, - String dataMapName) { - Map<String, TableDataMap> map = dataMapMappping.get(mapType); + public AbstractTableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, + DataMapType mapType, String dataMapName) { + Map<String, AbstractTableDataMap> map = dataMapMappping.get(mapType); if (map == null) { map = new HashMap<>(); dataMapMappping.put(mapType, map); } - TableDataMap dataMap = map.get(dataMapName); + AbstractTableDataMap dataMap = map.get(dataMapName); if (dataMap != null) { throw new RuntimeException("Already datamap exists in that path with type " + mapType); } try { - //TODO create datamap using @mapType.getClassName()) + dataMap = (AbstractTableDataMap) (Class.forName(mapType.getClassName()).newInstance()); } catch (Exception e) { LOGGER.error(e); --- End diff -- should not ignore the exception --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125539888 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java --- @@ -30,7 +31,7 @@ private static DataMapStoreManager instance = new DataMapStoreManager(); - private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>(); + private Map<DataMapType, Map<String, AbstractTableDataMap>> dataMapMappping = new HashMap<>(); --- End diff -- add comment to describe this map --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125540590 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletTableMap.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.AbstractTableDataMap; +import org.apache.carbondata.core.indexstore.DataMap; +import org.apache.carbondata.core.indexstore.DataMapDistributable; +import org.apache.carbondata.core.indexstore.DataMapWriter; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * Table map for blocklet + */ +public class BlockletTableMap extends AbstractTableDataMap { + + private String dataMapName; + + private AbsoluteTableIdentifier identifier; + + private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + + private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache; + + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + this.dataMapName = dataMapName; + cache = CacheProvider.getInstance() --- End diff -- why this is cached again? It is cached in `DataMapStoreManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125541084 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java --- @@ -20,7 +20,7 @@ * Datamap type */ public enum DataMapType { - BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap"); + BLOCKLET("org.apache.carbondata.core.indexstore.blockletindex.BlockletTableMap"); --- End diff -- please keep a Class Object in the `DataMapStoreManager` instead of using this enum --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125541188 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java --- @@ -30,7 +31,7 @@ private static DataMapStoreManager instance = new DataMapStoreManager(); - private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>(); + private Map<DataMapType, Map<String, AbstractTableDataMap>> dataMapMappping = new HashMap<>(); --- End diff -- I feel it is better to keep `Map<TableIdentifier, Map<Class, TableDataMap>>`, thus client can get all data map for one table easily. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125542074 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/AbstractTableDataMap.java --- @@ -84,11 +121,11 @@ * @param filterExp * @return */ - boolean isFiltersSupported(FilterResolverIntf filterExp); + public abstract boolean isFiltersSupported(FilterResolverIntf filterExp); --- End diff -- I feel it is better to accept a `FilterType` as input and return boolean, so client is not dependent on carbon's filter interface --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125543490 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.row; + +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; + +/** + * Index row + */ +public abstract class DataMapRow { --- End diff -- Can you describe more for this class, like saying this contains a list of fields which can be filled by client using ordinal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125546037 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletTableMap.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.AbstractTableDataMap; +import org.apache.carbondata.core.indexstore.DataMap; +import org.apache.carbondata.core.indexstore.DataMapDistributable; +import org.apache.carbondata.core.indexstore.DataMapWriter; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * Table map for blocklet + */ +public class BlockletTableMap extends AbstractTableDataMap { + + private String dataMapName; + + private AbsoluteTableIdentifier identifier; + + private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + + private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache; + + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + this.dataMapName = dataMapName; + cache = CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath()); + } + + @Override public DataMapWriter getMetaDataWriter() { --- End diff -- I think we can defer this abstraction till there is table level data map metadata, so we can delete this now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125546095 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletTableMap.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.AbstractTableDataMap; +import org.apache.carbondata.core.indexstore.DataMap; +import org.apache.carbondata.core.indexstore.DataMapDistributable; +import org.apache.carbondata.core.indexstore.DataMapWriter; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * Table map for blocklet + */ +public class BlockletTableMap extends AbstractTableDataMap { + + private String dataMapName; + + private AbsoluteTableIdentifier identifier; + + private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + + private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache; + + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + this.dataMapName = dataMapName; + cache = CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath()); + } + + @Override public DataMapWriter getMetaDataWriter() { + return null; + } + + @Override + public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) { + return null; + } + + @Override protected List<DataMap> getDataMaps(String segmentId) { --- End diff -- Please move this interface to `DataMap.Builder` interface. And also add `DataMap.Writer` interface --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125546189 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletTableMap.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.AbstractTableDataMap; +import org.apache.carbondata.core.indexstore.DataMap; +import org.apache.carbondata.core.indexstore.DataMapDistributable; +import org.apache.carbondata.core.indexstore.DataMapWriter; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * Table map for blocklet + */ +public class BlockletTableMap extends AbstractTableDataMap { --- End diff -- After creating `DataMap.Builder` interface, we can remove abstract class and make this class final. It should not be extensible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125546253 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormatNew.java --- @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.indexstore.AbstractTableDataMap; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.DataMapStoreManager; +import org.apache.carbondata.core.indexstore.DataMapType; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.mutate.data.BlockMappingVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.partition.PartitionUtil; +import org.apache.carbondata.core.scan.partition.Partitioner; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.util.StringUtils; + +/** + * Carbon Input format class representing one carbon table + */ +public class CarbonInputFormatNew<T> extends FileInputFormat<Void, T> { --- End diff -- Can you change to implement `CarbonTableInputFormat`. You can remove `Segment` class if it is not required --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r125546316 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormatNew.java --- @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.indexstore.AbstractTableDataMap; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.DataMapStoreManager; +import org.apache.carbondata.core.indexstore.DataMapType; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.mutate.data.BlockMappingVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.partition.PartitionUtil; +import org.apache.carbondata.core.scan.partition.Partitioner; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.util.StringUtils; + +/** + * Carbon Input format class representing one carbon table + */ +public class CarbonInputFormatNew<T> extends FileInputFormat<Void, T> { + + // comma separated list of input segment numbers + public static final String INPUT_SEGMENT_NUMBERS = + "mapreduce.input.carboninputformat.segmentnumbers"; + // comma separated list of input files + public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; + private static final Log LOG = LogFactory.getLog(CarbonInputFormatNew.class); + private static final String FILTER_PREDICATE = + "mapreduce.input.carboninputformat.filter.predicate"; + private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; + private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; + private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; + + /** + * It is optional, if user does not set then it reads from store + * + * @param configuration + * @param carbonTable + * @throws IOException + */ + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (null != carbonTable) { + configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable)); + } + } + + public static CarbonTable getCarbonTable(Configuration configuration) throws IOException { + String carbonTableStr = configuration.get(CARBON_TABLE); + if (carbonTableStr == null) { + populateCarbonTable(configuration); + // read it from schema file in the store + carbonTableStr = configuration.get(CARBON_TABLE); + return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + } + return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + } + + /** + * this method will read the schema from the physical file and populate into CARBON_TABLE + * + * @param configuration + * @throws IOException + */ + private static void populateCarbonTable(Configuration configuration) throws IOException { + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); + } + AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); + // read the schema file to get the absoluteTableIdentifier having the correct table id + // persisted in the schema + CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); + setCarbonTable(configuration, carbonTable); + } + + public static void setTablePath(Configuration configuration, String tablePath) + throws IOException { + configuration.set(FileInputFormat.INPUT_DIR, tablePath); + } + + /** + * It sets unresolved filter expression. + * + * @param configuration + * @param filterExpression + */ + public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { + if (filterExpression == null) { + return; + } + try { + String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); + configuration.set(FILTER_PREDICATE, filterString); + } catch (Exception e) { + throw new RuntimeException("Error while setting filter expression to Job", e); + } + } + + public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { + if (projection == null || projection.isEmpty()) { + return; + } + String[] allColumns = projection.getAllColumns(); + StringBuilder builder = new StringBuilder(); + for (String column : allColumns) { + builder.append(column).append(","); + } + String columnString = builder.toString(); + columnString = columnString.substring(0, columnString.length() - 1); + configuration.set(COLUMN_PROJECTION, columnString); + } + + public static String getColumnProjection(Configuration configuration) { + return configuration.get(COLUMN_PROJECTION); + } + + public static void setCarbonReadSupport(Configuration configuration, + Class<? extends CarbonReadSupport> readSupportClass) { + if (readSupportClass != null) { + configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName()); + } + } + + private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) { + return CarbonStorePath.getCarbonTablePath(absIdentifier); + } + + /** + * Set list of segments to access + */ + public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) { + configuration.set(CarbonInputFormatNew.INPUT_SEGMENT_NUMBERS, + CarbonUtil.getSegmentString(validSegments)); + } + + /** + * Set list of files to access + */ + public static void setFilesToAccess(Configuration configuration, List<String> validFiles) { + configuration.set(CarbonInputFormatNew.INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); + } + + private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + throws IOException { + return getCarbonTable(configuration).getAbsoluteTableIdentifier(); + } + + /** + * {@inheritDoc} + * Configurations FileInputFormat.INPUT_DIR + * are used to get table path to read. + * + * @param job + * @return List<InputSplit> list of CarbonInputSplit + * @throws IOException + */ + @Override public List<InputSplit> getSplits(JobContext job) throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + AbstractTableDataMap blockletMap = + DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET); + List<String> invalidSegments = new ArrayList<>(); + List<UpdateVO> invalidTimestampsList = new ArrayList<>(); + + // get all valid segments and set them into the configuration + if (getSegmentsToAccess(job).length == 0) { + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); + SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = + segmentStatusManager.getValidAndInvalidSegments(); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); + setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments()); --- End diff -- just pass the segment list to `getSplit` at line 290, instead of setting it in configuration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1099#discussion_r126063998 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/AbstractTableDataMap.java --- @@ -26,28 +27,29 @@ * DataMap at the table level, user can add any number of datamaps for one table. Depends * on the filter condition it can prune the blocklets. */ -public interface TableDataMap extends EventListener { +public abstract class AbstractTableDataMap implements EventListener { --- End diff -- Since it is abstract class better name as `AbstractTableDataMap` and one Segment can have multiple datamaps so `SegmentDataMap` may not justify the name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |