[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap

classic Classic list List threaded Threaded
48 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2243/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r244344781
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.cache.Cache;
    +import org.apache.carbondata.core.cache.CacheProvider;
    +import org.apache.carbondata.core.cache.CacheType;
    +import org.apache.carbondata.core.datamap.DataMapDistributable;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.DataMapMeta;
    +import org.apache.carbondata.core.datamap.DataMapStoreManager;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.TableDataMap;
    +import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.features.TableOperation;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.events.Event;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * Min Max DataMap Factory
    + */
    +@InterfaceAudience.Internal
    +public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory {
    +  private static final Logger LOGGER =
    +      LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName());
    +  private DataMapMeta dataMapMeta;
    +  private String dataMapName;
    +  // segmentId -> list of index files
    +  private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
    +  private Cache<MinMaxDataMapCacheKeyValue.Key, MinMaxDataMapCacheKeyValue.Value> cache;
    +
    +  public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    super(carbonTable, dataMapSchema);
    +
    +    // this is an example for datamap, we can choose the columns and operations that
    +    // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
    +
    +    this.dataMapName = dataMapSchema.getDataMapName();
    +    List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +
    +    // operations that will be supported on the indexed columns
    +    List<ExpressionType> optOperations = new ArrayList<>();
    +    optOperations.add(ExpressionType.NOT);
    +    optOperations.add(ExpressionType.EQUALS);
    +    optOperations.add(ExpressionType.NOT_EQUALS);
    +    optOperations.add(ExpressionType.GREATERTHAN);
    +    optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.LESSTHAN);
    +    optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.IN);
    +    this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations);
    +
    +    // init cache. note that the createCache ensures the singleton of the cache
    +    try {
    +      this.cache = CacheProvider.getInstance()
    +          .createCache(new CacheType("minmax_cache"), MinMaxDataMapCache.class.getName());
    +    } catch (Exception e) {
    +      LOGGER.error("Failed to create cache for minmax datamap", e);
    +      throw new MalformedDataMapCommandException(e.getMessage());
    +    }
    +  }
    +
    +  /**
    +   * createWriter will return the MinMaxDataWriter.
    +   *
    +   * @param segment
    +   * @param shardName
    +   * @return
    +   */
    +  @Override
    +  public DataMapWriter createWriter(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    if (LOGGER.isDebugEnabled()) {
    +      LOGGER.debug(String.format(
    +          "Data of MinMaxDataMap %s for table %s will be written to %s",
    +          dataMapName, getCarbonTable().getTableName(), shardName));
    +    }
    +    return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  @Override
    +  public DataMapBuilder createBuilder(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  /**
    +   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    +   *
    +   * @param segment
    +   * @return
    +   * @throws IOException
    +   */
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
    +    Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
    +    if (shardPaths == null) {
    +      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    +          getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
    +      CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +      shardPaths = new HashSet<>();
    +      for (CarbonFile carbonFile : carbonFiles) {
    +        shardPaths.add(carbonFile.getAbsolutePath());
    +      }
    +      segmentMap.put(segment.getSegmentNo(), shardPaths);
    +    }
    +
    +    for (String shard : shardPaths) {
    +      MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
    +      dataMap.init(new MinMaxDataMapModel(shard, cache, segment.getConfiguration()));
    +      dataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +      dataMaps.add(dataMap);
    +    }
    +    return dataMaps;
    +  }
    +
    +  @Override
    +  public DataMapMeta getMeta() {
    +    return this.dataMapMeta;
    +  }
    +
    +  @Override
    +  public DataMapLevel getDataMapLevel() {
    +    return DataMapLevel.CG;
    +  }
    +
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
    +      throws IOException {
    +    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    +    MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap();
    +    String indexPath = ((MinMaxDataMapDistributable) distributable).getIndexPath();
    +    minMaxIndexDataMap.init(
    +        new MinMaxDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
    +    minMaxIndexDataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +    coarseGrainDataMaps.add(minMaxIndexDataMap);
    +    return coarseGrainDataMaps;
    +  }
    +
    +  /**
    +   * returns all the directories of lucene index files for query
    +   * Note: copied from BloomFilterDataMapFactory, will extract to a common interface
    +   */
    +  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
    +    List<CarbonFile> indexDirs = new ArrayList<>();
    +    List<TableDataMap> dataMaps;
    +    try {
    +      // there can be multiple bloom datamaps present on a table, so get all datamaps and form
    +      // the path till the index file directories in all datamaps folders present in each segment
    +      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    +    } catch (IOException ex) {
    +      LOGGER.error(String.format(
    +          "failed to get datamaps for tablePath %s, segmentId %s", tablePath, segmentId), ex);
    +      throw new RuntimeException(ex);
    +    }
    +    if (dataMaps.size() > 0) {
    +      for (TableDataMap dataMap : dataMaps) {
    +        if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    +          List<CarbonFile> indexFiles;
    +          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
    +              dataMap.getDataMapSchema().getDataMapName());
    +          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath);
    +          indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
    +            @Override
    +            public boolean accept(CarbonFile file) {
    +              return file.isDirectory();
    +            }
    +          }));
    +          indexDirs.addAll(indexFiles);
    +        }
    +      }
    +    }
    +    return indexDirs.toArray(new CarbonFile[0]);
    +  }
    +
    +  @Override
    +  public List<DataMapDistributable> toDistributable(Segment segment) {
    +    List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
    +    CarbonFile[] indexDirs =
    +        getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo());
    +    if (segment.getFilteredIndexShardNames().size() == 0) {
    +      for (CarbonFile indexDir : indexDirs) {
    +        DataMapDistributable bloomDataMapDistributable =
    +            new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
    +        dataMapDistributableList.add(bloomDataMapDistributable);
    +      }
    +      return dataMapDistributableList;
    +    }
    +    for (CarbonFile indexDir : indexDirs) {
    +      // Filter out the tasks which are filtered through CG datamap.
    +      if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
    +        continue;
    +      }
    +      DataMapDistributable bloomDataMapDistributable =
    +          new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
    +      dataMapDistributableList.add(bloomDataMapDistributable);
    +    }
    +    return dataMapDistributableList;
    +  }
    +
    +  @Override
    +  public void fireEvent(Event event) {
    +
    +  }
    +
    +  @Override
    +  public void clear(Segment segment) {
    +    Set<String> shards = segmentMap.remove(segment.getSegmentNo());
    +    if (null != shards) {
    +      for (String shard : shards) {
    +        cache.invalidate(new MinMaxDataMapCacheKeyValue.Key(shard));
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public synchronized void clear() {
    +    if (segmentMap.size() > 0) {
    +      List<String> segments = new ArrayList<>(segmentMap.keySet());
    +      for (String segmentId : segments) {
    +        clear(new Segment(segmentId, null, null));
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void deleteDatamapData(Segment segment) throws IOException {
    +    try {
    +      String segmentId = segment.getSegmentNo();
    +      String datamapPath = CarbonTablePath
    +          .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName);
    +      if (FileFactory.isFileExist(datamapPath)) {
    +        CarbonFile file = FileFactory.getCarbonFile(datamapPath);
    +        CarbonUtil.deleteFoldersAndFilesSilent(file);
    +      }
    +    } catch (InterruptedException ex) {
    +      throw new IOException("Failed to delete datamap for segment_" + segment.getSegmentNo());
    +    }
    +  }
    +
    +  @Override
    +  public void deleteDatamapData() {
    +    SegmentStatusManager ssm =
    +        new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
    +    try {
    +      List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
    +      for (Segment segment : validSegments) {
    +        deleteDatamapData(segment);
    +      }
    +    } catch (IOException e) {
    +      LOGGER.error("drop datamap failed, failed to delete datamap directory");
    +    }
    +  }
    +
    +  @Override
    +  public boolean willBecomeStale(TableOperation operation) {
    +    switch (operation) {
    +      case ALTER_DROP:
    +      case ALTER_CHANGE_DATATYPE:
    +      case DELETE:
    +      case UPDATE:
    +      case PARTITION:
    +        return true;
    +      default:
    +        return false;
    +    }
    +  }
    +
    +  @Override
    +  public boolean isOperationBlocked(TableOperation operation, Object... targets) {
    +    switch (operation) {
    +      case ALTER_DROP: {
    --- End diff --
   
    yes, copied from implementation of bloomfilter datamap


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r245013088
 
    --- Diff: pom.xml ---
    @@ -527,6 +526,7 @@
             <module>examples/spark2</module>
             <module>datamap/lucene</module>
             <module>datamap/bloom</module>
    +        <module>datamap/example</module>
    --- End diff --
   
    I think it is better not to add this, since it will make the assembling bigger


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r245633754
 
    --- Diff: pom.xml ---
    @@ -527,6 +526,7 @@
             <module>examples/spark2</module>
             <module>datamap/lucene</module>
             <module>datamap/bloom</module>
    +        <module>datamap/example</module>
    --- End diff --
   
    Excluding this will cause the datamap example module outdated and has potential unfixed bugs later, which is the previous status of this module.
   
    Maybe we can execlude this from the assembly jar


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    @jackylk Actually after applying the above commit, the size of the shade decrease from 40652 Bytes to 40620 Bytes


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2197/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10454/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2414/



---
123