[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

classic Classic list List threaded Threaded
103 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185429065
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    +      CarbonTable carbonTable) throws MalformedDataMapCommandException {
    +    List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +    Set<String> unique = new HashSet<>();
    +    for (CarbonColumn indexColumn : indexColumns) {
    +      unique.add(indexColumn.getColName());
    +    }
    +    if (unique.size() != indexColumns.size()) {
    +      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column");
    +    }
    +  }
    +
    +  public static String[] getIndexColumns(DataMapSchema dataMapSchema)
    --- End diff --
   
    why should it be static? better make it nonstatic and don't pass the schema as it has schema. Otherwise better move to some utility instead of keeping in abstract class


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185429305
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java ---
    @@ -42,67 +43,80 @@
      *
      * <br>Currently CarbonData supports following provider:
      * <ol>
    - *   <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
    - *   <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
    - *     of the table </li>
    - *   <li> class name of {@link org.apache.carbondata.core.datamap.dev.DataMapFactory}
    - * implementation: Developer can implement new type of DataMap by extending
    - * {@link org.apache.carbondata.core.datamap.dev.DataMapFactory} </li>
    + *   <li> preaggregate: pre-aggregate table of single table </li>
    + *   <li> timeseries: pre-aggregate table based on time dimension of the table </li>
    + *   <li> lucene: index backed by Apache Lucene </li>
    + *   <li> bloomfilter: index backed by Bloom Filter </li>
      * </ol>
      *
      * @since 1.4.0
      */
     @InterfaceAudience.Internal
    -public interface DataMapProvider {
    +public abstract class DataMapProvider {
    +
    +  private CarbonTable mainTable;
    +  private DataMapSchema dataMapSchema;
    +
    +  public DataMapProvider(CarbonTable mainTable, DataMapSchema dataMapSchema) {
    +    this.mainTable = mainTable;
    +    this.dataMapSchema = dataMapSchema;
    +  }
    +
    +  protected final CarbonTable getMainTable() {
    +    return mainTable;
    +  }
    +
    +  protected final DataMapSchema getDataMapSchema() {
    +    return dataMapSchema;
    +  }
     
       /**
        * Initialize a datamap's metadata.
        * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
        * Implementation should initialize metadata for datamap, like creating table
        */
    -  void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement)
    -      throws MalformedDataMapCommandException, IOException;
    +  public abstract void initMeta(String ctasSqlStatement) throws MalformedDataMapCommandException,
    +      IOException;
     
       /**
        * Initialize a datamap's data.
        * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
        * Implementation should initialize data for datamap, like creating data folders
        */
    -  void initData(CarbonTable mainTable);
    +  public abstract void initData();
     
       /**
    -   * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String)}.
    +   * Opposite operation of {@link #initMeta(String)}.
        * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
        * Implementation should clean all meta for the datamap
        */
    -  void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException;
    +  public abstract void freeMeta() throws IOException;
    --- End diff --
   
    I changed to `cleanMeta` and `cleanData`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185429862
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    --- End diff --
   
    Because it needs to be override by LuceneDataMapFactoryBase, so I could not make it static


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185430793
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap.dev;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +
    +@InterfaceAudience.Developer("DataMap")
    +public interface DataMapRefresher {
    --- End diff --
   
    I don't think it is required to have a separate interface for it. It should use same writer interface. It would be complex for developer to implement so many interfaces for datamap. We should take writer flow even for refreshing.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185431123
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---
    @@ -66,22 +85,22 @@ public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
        *
        * @param blockletId sequence number of blocklet in the block
        */
    -  public abstract void onBlockletStart(int blockletId);
    +  public abstract void onBlockletStart(int blockletId) throws IOException;
     
       /**
        * End of blocklet notification
        *
        * @param blockletId sequence number of blocklet in the block
        */
    -  public abstract void onBlockletEnd(int blockletId);
    +  public abstract void onBlockletEnd(int blockletId) throws IOException;
     
       /**
    -   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
    +   * Add row data to the datamap, order of field is same as `indexColumns` in
        * DataMapMeta returned in DataMapFactory.
    -   * Implementation should copy the content of `pages` as needed, because `pages` memory
    -   * may be freed after this method returns, if using unsafe column page.
    +   * Implementation should copy the content of `row` as needed, because its memory
    +   * may be freed after this method returns, in case of unsafe memory
        */
    -  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
    +  public abstract void addRow(int blockletId, int pageId, int rowId, CarbonRow row)
    --- End diff --
   
    I feel old interface is good as we don't require to convert to row, it might impact the performance.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185431282
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -986,4 +990,26 @@ public boolean canAllow(CarbonTable carbonTable, TableOperation operation) {
         return true;
       }
     
    +  /**
    +   * Get all index columns specified by dataMapSchema
    +   */
    +  public List<CarbonColumn> getIndexedColumns(DataMapSchema dataMapSchema)
    --- End diff --
   
    Because there are 15 usages of this function, and they all has dataMapSchema. If I change to pass columnNames as input parameter, all 15 usages need to change and add more code, so I think it is better to keep in this way, like a helper function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185431350
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -986,4 +990,26 @@ public boolean canAllow(CarbonTable carbonTable, TableOperation operation) {
         return true;
       }
     
    +  /**
    +   * Get all index columns specified by dataMapSchema
    +   */
    +  public List<CarbonColumn> getIndexedColumns(DataMapSchema dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    String[] columns = DataMapFactory.getIndexColumns(dataMapSchema);
    +    List<CarbonColumn> indexColumn = new ArrayList<>(columns.length);
    +    for (String column : columns) {
    +      CarbonColumn carbonColumn = getColumnByName(getTableName(), column.trim().toLowerCase());
    +      if (carbonColumn == null) {
    +        throw new MalformedDataMapCommandException(String.format(
    +            "column '%s' does not exist in table. Please check create DataMap statement.",
    +            column));
    +      }
    +      if (carbonColumn.getColName().isEmpty()) {
    +        throw new MalformedDataMapCommandException(
    +            DataMapFactory.INDEX_COLUMNS + " contains invalid column name");
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185432739
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---
    @@ -36,25 +41,39 @@
     @InterfaceStability.Evolving
     public abstract class DataMapWriter {
     
    -  protected AbsoluteTableIdentifier identifier;
    +  protected String tablePath;
     
       protected String segmentId;
     
    -  protected String writeDirectoryPath;
    +  protected String dataMapPath;
     
    -  public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
    -      String writeDirectoryPath) {
    -    this.identifier = identifier;
    +  private List<CarbonColumn> indexColumns;
    +
    +  public DataMapWriter(CarbonTable carbonTable, DataMapSchema dataMapSchema, Segment segment,
    +      String shardName) throws MalformedDataMapCommandException {
    +    this(carbonTable.getTablePath(), dataMapSchema.getDataMapName(),
    +        carbonTable.getIndexedColumns(dataMapSchema), segment, shardName);
    +  }
    +
    +  public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
    +      Segment segment, String shardName) {
    +    this.tablePath = tablePath;
         this.segmentId = segment.getSegmentNo();
    -    this.writeDirectoryPath = writeDirectoryPath;
    +    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
    +        tablePath, segmentId, dataMapName, shardName);
    +    this.indexColumns = indexColumns;
    +  }
    +
    +  protected final List<CarbonColumn> getIndexColumns() {
    +    return indexColumns;
       }
     
       /**
        * Start of new block notification.
        *
        * @param blockId file name of the carbondata file
        */
    -  public abstract void onBlockStart(String blockId, String indexShardName) throws IOException;
    +  public abstract void onBlockStart(String blockId) throws IOException;
    --- End diff --
   
    Currently IOException is enough, since all datamap implementation will write some data into files


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185433421
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---
    @@ -66,22 +85,22 @@ public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
        *
        * @param blockletId sequence number of blocklet in the block
        */
    -  public abstract void onBlockletStart(int blockletId);
    +  public abstract void onBlockletStart(int blockletId) throws IOException;
     
       /**
        * End of blocklet notification
        *
        * @param blockletId sequence number of blocklet in the block
        */
    -  public abstract void onBlockletEnd(int blockletId);
    +  public abstract void onBlockletEnd(int blockletId) throws IOException;
     
       /**
    -   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
    +   * Add row data to the datamap, order of field is same as `indexColumns` in
        * DataMapMeta returned in DataMapFactory.
    -   * Implementation should copy the content of `pages` as needed, because `pages` memory
    -   * may be freed after this method returns, if using unsafe column page.
    +   * Implementation should copy the content of `row` as needed, because its memory
    +   * may be freed after this method returns, in case of unsafe memory
        */
    -  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
    +  public abstract void addRow(int blockletId, int pageId, int rowId, CarbonRow row)
    --- End diff --
   
    Because currently all datamap implementation (lucene, bloom filter, minmax) are adding index row by row, even if the framework gives ColumnPage data to the datamap implementation. I think it is not coincidence, so I changed the interface to give row data.
    I will change to `onRowAdded`  


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185434672
 
    --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---
    @@ -21,142 +21,124 @@
     import java.io.IOException;
     import java.io.ObjectOutputStream;
     import java.util.ArrayList;
    -import java.util.HashMap;
     import java.util.List;
    -import java.util.Map;
     
     import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.constants.CarbonCommonConstants;
    -import org.apache.carbondata.core.datamap.DataMapMeta;
     import org.apache.carbondata.core.datamap.Segment;
     import org.apache.carbondata.core.datamap.dev.DataMapWriter;
     import org.apache.carbondata.core.datastore.impl.FileFactory;
    -import org.apache.carbondata.core.datastore.page.ColumnPage;
    -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
     import org.apache.carbondata.core.metadata.datatype.DataType;
     import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.core.util.CarbonUtil;
     
     import com.google.common.hash.BloomFilter;
     import com.google.common.hash.Funnels;
     
     /**
    - * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is
    - * constructed to indicate whether a value belongs to this blocklet. Bloom filter of blocklet that
    - * belongs to same block will be written to one index file suffixed with .bloomindex. So the number
    + * BloomDataMap is constructed in CG level (blocklet level).
    + * For each indexed column, a bloom filter is constructed to indicate whether a value
    + * belongs to this blocklet. Bloom filter of blocklet that belongs to same block will
    + * be written to one index file suffixed with .bloomindex. So the number
      * of bloom index file will be equal to that of the blocks.
      */
     @InterfaceAudience.Internal
     public class BloomDataMapWriter extends DataMapWriter {
    -  private String dataMapName;
    -  private List<String> indexedColumns;
    +  private static final LogService LOG = LogServiceFactory.getLogService(
    +      BloomDataMapWriter.class.getCanonicalName());
       private int bloomFilterSize;
    -  // map column name to ordinal in pages
    -  private Map<String, Integer> col2Ordianl;
    -  private Map<String, DataType> col2DataType;
    -  private String indexShardName;
    -  private int currentBlockletId;
    +  protected int currentBlockletId;
       private List<String> currentDMFiles;
       private List<DataOutputStream> currentDataOutStreams;
       private List<ObjectOutputStream> currentObjectOutStreams;
       private List<BloomFilter<byte[]>> indexBloomFilters;
     
    -  @InterfaceAudience.Internal
    -  public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta dataMapMeta,
    -      int bloomFilterSize, Segment segment, String writeDirectoryPath) {
    -    super(identifier, segment, writeDirectoryPath);
    -    dataMapName = dataMapMeta.getDataMapName();
    -    indexedColumns = dataMapMeta.getIndexedColumns();
    +  BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
    +      Segment segment, String shardName, int bloomFilterSize) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
         this.bloomFilterSize = bloomFilterSize;
    -    col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
    -    col2DataType = new HashMap<String, DataType>(indexedColumns.size());
     
    -    currentDMFiles = new ArrayList<String>(indexedColumns.size());
    -    currentDataOutStreams = new ArrayList<DataOutputStream>(indexedColumns.size());
    -    currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexedColumns.size());
    -
    -    indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
    +    currentDMFiles = new ArrayList<String>(indexColumns.size());
    +    currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size());
    +    currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexColumns.size());
    +    indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexColumns.size());
    +    initDataMapFile();
    +    resetBloomFilters();
       }
     
       @Override
    -  public void onBlockStart(String blockId, String indexShardName) throws IOException {
    -    if (this.indexShardName == null) {
    -      this.indexShardName = indexShardName;
    -      initDataMapFile();
    -    }
    +  public void onBlockStart(String blockId) throws IOException {
       }
     
       @Override
       public void onBlockEnd(String blockId) throws IOException {
    -
       }
     
       @Override
       public void onBlockletStart(int blockletId) {
    -    this.currentBlockletId = blockletId;
    +  }
    +
    +  protected void resetBloomFilters() {
         indexBloomFilters.clear();
    -    for (int i = 0; i < indexedColumns.size(); i++) {
    +    List<CarbonColumn> indexColumns = getIndexColumns();
    +    for (int i = 0; i < indexColumns.size(); i++) {
           indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
               bloomFilterSize, 0.00001d));
         }
       }
     
       @Override
       public void onBlockletEnd(int blockletId) {
    -    try {
    -      writeBloomDataMapFile();
    -    } catch (Exception e) {
    -      for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
    -        CarbonUtil.closeStreams(objectOutputStream);
    -      }
    -      for (DataOutputStream dataOutputStream : currentDataOutStreams) {
    -        CarbonUtil.closeStreams(dataOutputStream);
    -      }
    -      throw new RuntimeException(e);
    -    }
    +    writeBloomDataMapFile();
    +    currentBlockletId++;
       }
     
    -  // notice that the input pages only contains the indexed columns
       @Override
    -  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
    -      throws IOException {
    -    col2Ordianl.clear();
    -    col2DataType.clear();
    -    for (int colId = 0; colId < pages.length; colId++) {
    -      String columnName = pages[colId].getColumnSpec().getFieldName().toLowerCase();
    -      col2Ordianl.put(columnName, colId);
    -      DataType columnType = pages[colId].getColumnSpec().getSchemaDataType();
    -      col2DataType.put(columnName, columnType);
    -    }
    +  public void addRow(int blockletId, int pageId, int rowId, CarbonRow row) {
    +    addRow(row.getData());
    +  }
     
    -    // for each row
    -    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
    -      // for each indexed column
    -      for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
    -        String indexedCol = indexedColumns.get(indexColId);
    -        byte[] indexValue;
    -        if (DataTypes.STRING == col2DataType.get(indexedCol)
    -            || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
    -          byte[] originValue = (byte[]) pages[col2Ordianl.get(indexedCol)].getData(rowId);
    -          indexValue = new byte[originValue.length - 2];
    -          System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
    -        } else {
    -          Object originValue = pages[col2Ordianl.get(indexedCol)].getData(rowId);
    -          indexValue = CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
    -        }
    -
    -        indexBloomFilters.get(indexColId).put(indexValue);
    +  protected void addRow(Object[] rowData) {
    +    // for each indexed column, add the data to bloom filter
    +    List<CarbonColumn> indexColumns = getIndexColumns();
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      DataType dataType = indexColumns.get(i).getDataType();
    +      byte[] indexValue;
    +      if (DataTypes.STRING == dataType) {
    --- End diff --
   
    getStringData need to be override by subclass (BloomDataMapRefresher), BloomDataMapRefresher will get the row data from IndexDataMapRefreshRDD.internalCompute, it needs a different implementation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185434808
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java ---
    @@ -45,14 +51,40 @@
     public class DataMapRegistry {
       private static Map<String, String> shortNameToClassName = new ConcurrentHashMap<>();
     
    -  public static void registerDataMap(String datamapClassName, String shortName) {
    +  private static void registerDataMap(String datamapClassName, String shortName) {
         Objects.requireNonNull(datamapClassName);
         Objects.requireNonNull(shortName);
         shortNameToClassName.put(shortName, datamapClassName);
       }
     
    -  public static String getDataMapClassName(String shortName) {
    +  private static String getDataMapClassName(String shortName) {
         Objects.requireNonNull(shortName);
         return shortNameToClassName.get(shortName);
       }
    +
    +  public static DataMapFactory<? extends DataMap> getDataMapByShortName(
    --- End diff --
   
    ok, fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185434908
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -494,7 +524,7 @@ public static long getTaskIdFromTaskNo(String taskNo) {
         }
     
         /**
    -     * Return the batch number from taskNo string
    +     * Return the batch number from taskNo stringx
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185435123
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---
    @@ -62,25 +63,31 @@ public void registerAllWriter(CarbonTable carbonTable, String segmentId,
         if (tableIndices != null) {
           for (TableDataMap tableDataMap : tableIndices) {
             DataMapFactory factory = tableDataMap.getDataMapFactory();
    -        register(factory, segmentId, dataWritePath);
    +        register(factory, segmentId, taskNo);
           }
         }
       }
     
       /**
        * Register a DataMapWriter
        */
    -  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
    +  private void register(DataMapFactory factory, String segmentId, String taskNo) {
         assert (factory != null);
         assert (segmentId != null);
         DataMapMeta meta = factory.getMeta();
         if (meta == null) {
           // if data map does not have meta, no need to register
           return;
         }
    -    List<String> columns = factory.getMeta().getIndexedColumns();
    +    List<CarbonColumn> columns = factory.getMeta().getIndexedColumns();
         List<DataMapWriter> writers = registry.get(columns);
    -    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
    +    DataMapWriter writer = null;
    +    try {
    +      writer = factory.createWriter(new Segment(segmentId), taskNo);
    +    } catch (IOException e) {
    +      LOG.error(e);
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185435827
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---
    @@ -62,25 +63,31 @@ public void registerAllWriter(CarbonTable carbonTable, String segmentId,
         if (tableIndices != null) {
           for (TableDataMap tableDataMap : tableIndices) {
             DataMapFactory factory = tableDataMap.getDataMapFactory();
    -        register(factory, segmentId, dataWritePath);
    +        register(factory, segmentId, taskNo);
           }
         }
       }
     
       /**
        * Register a DataMapWriter
        */
    -  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
    +  private void register(DataMapFactory factory, String segmentId, String taskNo) {
         assert (factory != null);
         assert (segmentId != null);
         DataMapMeta meta = factory.getMeta();
         if (meta == null) {
           // if data map does not have meta, no need to register
           return;
         }
    -    List<String> columns = factory.getMeta().getIndexedColumns();
    +    List<CarbonColumn> columns = factory.getMeta().getIndexedColumns();
         List<DataMapWriter> writers = registry.get(columns);
    -    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
    +    DataMapWriter writer = null;
    +    try {
    +      writer = factory.createWriter(new Segment(segmentId), taskNo);
    +    } catch (IOException e) {
    +      LOG.error(e);
    +      throw new RuntimeException(e);
    --- End diff --
   
    I created DataMapWriterException


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185460028
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---
    @@ -130,16 +137,26 @@ public void onBlockletEnd(int blockletId) {
        * @param tablePage  page data
        */
       public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException {
    -    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
    -    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
    -      List<String> indexedColumns = entry.getKey();
    +    Set<Map.Entry<List<CarbonColumn>, List<DataMapWriter>>> entries = registry.entrySet();
    +    for (Map.Entry<List<CarbonColumn>, List<DataMapWriter>> entry : entries) {
    +      List<CarbonColumn> indexedColumns = entry.getKey();
           ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
           for (int i = 0; i < indexedColumns.size(); i++) {
    -        pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
    +        pages[i] = tablePage.getColumnPage(indexedColumns.get(i).getColName());
           }
           List<DataMapWriter> writers = entry.getValue();
    -      for (DataMapWriter writer : writers) {
    -        writer.onPageAdded(blockletId, pageId, pages);
    +      int pageSize = pages[0].getPageSize();
    +
    +      // add every row in the page to writer
    +      for (int rowId = 0; rowId < pageSize; rowId++) {
    +        Object[] rowData = new Object[indexedColumns.size()];
    +        for (int k = 0; k < rowData.length; k++) {
    +          rowData[k] = pages[k].getData(rowId);
    +        }
    +        CarbonRow row = new CarbonRow(rowData);
    --- End diff --
   
    ok, I will change back to pass ColumnPage


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185465559
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap.dev;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +
    +@InterfaceAudience.Developer("DataMap")
    +public interface DataMapRefresher {
    --- End diff --
   
    Because in Refresher, it is taken row by row, so it is different from DataMapWriter.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5585/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4424/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185488673
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---
    @@ -66,22 +85,22 @@ public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
        *
        * @param blockletId sequence number of blocklet in the block
        */
    -  public abstract void onBlockletStart(int blockletId);
    +  public abstract void onBlockletStart(int blockletId) throws IOException;
     
       /**
        * End of blocklet notification
        *
        * @param blockletId sequence number of blocklet in the block
        */
    -  public abstract void onBlockletEnd(int blockletId);
    +  public abstract void onBlockletEnd(int blockletId) throws IOException;
     
       /**
    -   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
    +   * Add row data to the datamap, order of field is same as `indexColumns` in
        * DataMapMeta returned in DataMapFactory.
    -   * Implementation should copy the content of `pages` as needed, because `pages` memory
    -   * may be freed after this method returns, if using unsafe column page.
    +   * Implementation should copy the content of `row` as needed, because its memory
    +   * may be freed after this method returns, in case of unsafe memory
        */
    -  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
    +  public abstract void addRow(int blockletId, int pageId, int rowId, CarbonRow row)
    --- End diff --
   
    ok, fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185488751
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
    --- End diff --
   
    ok, removed


---
123456