[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

classic Classic list List threaded Threaded
103 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185235683
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---
    @@ -36,25 +41,39 @@
     @InterfaceStability.Evolving
     public abstract class DataMapWriter {
     
    -  protected AbsoluteTableIdentifier identifier;
    +  protected String tablePath;
     
       protected String segmentId;
     
    -  protected String writeDirectoryPath;
    +  protected String dataMapPath;
     
    -  public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
    -      String writeDirectoryPath) {
    -    this.identifier = identifier;
    +  private List<CarbonColumn> indexColumns;
    +
    +  public DataMapWriter(CarbonTable carbonTable, DataMapSchema dataMapSchema, Segment segment,
    +      String shardName) throws MalformedDataMapCommandException {
    +    this(carbonTable.getTablePath(), dataMapSchema.getDataMapName(),
    +        carbonTable.getIndexedColumns(dataMapSchema), segment, shardName);
    +  }
    +
    +  public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
    +      Segment segment, String shardName) {
    +    this.tablePath = tablePath;
         this.segmentId = segment.getSegmentNo();
    -    this.writeDirectoryPath = writeDirectoryPath;
    +    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
    +        tablePath, segmentId, dataMapName, shardName);
    +    this.indexColumns = indexColumns;
    +  }
    +
    +  protected final List<CarbonColumn> getIndexColumns() {
    +    return indexColumns;
       }
     
       /**
        * Start of new block notification.
        *
        * @param blockId file name of the carbondata file
        */
    -  public abstract void onBlockStart(String blockId, String indexShardName) throws IOException;
    +  public abstract void onBlockStart(String blockId) throws IOException;
    --- End diff --
   
    Why not define an exception to wrap the exceptions during writing datamap?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185238634
 
    --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---
    @@ -21,142 +21,124 @@
     import java.io.IOException;
     import java.io.ObjectOutputStream;
     import java.util.ArrayList;
    -import java.util.HashMap;
     import java.util.List;
    -import java.util.Map;
     
     import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.constants.CarbonCommonConstants;
    -import org.apache.carbondata.core.datamap.DataMapMeta;
     import org.apache.carbondata.core.datamap.Segment;
     import org.apache.carbondata.core.datamap.dev.DataMapWriter;
     import org.apache.carbondata.core.datastore.impl.FileFactory;
    -import org.apache.carbondata.core.datastore.page.ColumnPage;
    -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
     import org.apache.carbondata.core.metadata.datatype.DataType;
     import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.core.util.CarbonUtil;
     
     import com.google.common.hash.BloomFilter;
     import com.google.common.hash.Funnels;
     
     /**
    - * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is
    - * constructed to indicate whether a value belongs to this blocklet. Bloom filter of blocklet that
    - * belongs to same block will be written to one index file suffixed with .bloomindex. So the number
    + * BloomDataMap is constructed in CG level (blocklet level).
    + * For each indexed column, a bloom filter is constructed to indicate whether a value
    + * belongs to this blocklet. Bloom filter of blocklet that belongs to same block will
    + * be written to one index file suffixed with .bloomindex. So the number
      * of bloom index file will be equal to that of the blocks.
      */
     @InterfaceAudience.Internal
     public class BloomDataMapWriter extends DataMapWriter {
    -  private String dataMapName;
    -  private List<String> indexedColumns;
    +  private static final LogService LOG = LogServiceFactory.getLogService(
    +      BloomDataMapWriter.class.getCanonicalName());
       private int bloomFilterSize;
    -  // map column name to ordinal in pages
    -  private Map<String, Integer> col2Ordianl;
    -  private Map<String, DataType> col2DataType;
    -  private String indexShardName;
    -  private int currentBlockletId;
    +  protected int currentBlockletId;
       private List<String> currentDMFiles;
       private List<DataOutputStream> currentDataOutStreams;
       private List<ObjectOutputStream> currentObjectOutStreams;
       private List<BloomFilter<byte[]>> indexBloomFilters;
     
    -  @InterfaceAudience.Internal
    -  public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta dataMapMeta,
    -      int bloomFilterSize, Segment segment, String writeDirectoryPath) {
    -    super(identifier, segment, writeDirectoryPath);
    -    dataMapName = dataMapMeta.getDataMapName();
    -    indexedColumns = dataMapMeta.getIndexedColumns();
    +  BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
    +      Segment segment, String shardName, int bloomFilterSize) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
         this.bloomFilterSize = bloomFilterSize;
    -    col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
    -    col2DataType = new HashMap<String, DataType>(indexedColumns.size());
     
    -    currentDMFiles = new ArrayList<String>(indexedColumns.size());
    -    currentDataOutStreams = new ArrayList<DataOutputStream>(indexedColumns.size());
    -    currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexedColumns.size());
    -
    -    indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
    +    currentDMFiles = new ArrayList<String>(indexColumns.size());
    +    currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size());
    +    currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexColumns.size());
    +    indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexColumns.size());
    +    initDataMapFile();
    +    resetBloomFilters();
       }
     
       @Override
    -  public void onBlockStart(String blockId, String indexShardName) throws IOException {
    -    if (this.indexShardName == null) {
    -      this.indexShardName = indexShardName;
    -      initDataMapFile();
    -    }
    +  public void onBlockStart(String blockId) throws IOException {
       }
     
       @Override
       public void onBlockEnd(String blockId) throws IOException {
    -
       }
     
       @Override
       public void onBlockletStart(int blockletId) {
    -    this.currentBlockletId = blockletId;
    +  }
    +
    +  protected void resetBloomFilters() {
         indexBloomFilters.clear();
    -    for (int i = 0; i < indexedColumns.size(); i++) {
    +    List<CarbonColumn> indexColumns = getIndexColumns();
    +    for (int i = 0; i < indexColumns.size(); i++) {
           indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
               bloomFilterSize, 0.00001d));
         }
       }
     
       @Override
       public void onBlockletEnd(int blockletId) {
    -    try {
    -      writeBloomDataMapFile();
    -    } catch (Exception e) {
    -      for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
    -        CarbonUtil.closeStreams(objectOutputStream);
    -      }
    -      for (DataOutputStream dataOutputStream : currentDataOutStreams) {
    -        CarbonUtil.closeStreams(dataOutputStream);
    -      }
    -      throw new RuntimeException(e);
    -    }
    +    writeBloomDataMapFile();
    +    currentBlockletId++;
       }
     
    -  // notice that the input pages only contains the indexed columns
       @Override
    -  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
    -      throws IOException {
    -    col2Ordianl.clear();
    -    col2DataType.clear();
    -    for (int colId = 0; colId < pages.length; colId++) {
    -      String columnName = pages[colId].getColumnSpec().getFieldName().toLowerCase();
    -      col2Ordianl.put(columnName, colId);
    -      DataType columnType = pages[colId].getColumnSpec().getSchemaDataType();
    -      col2DataType.put(columnName, columnType);
    -    }
    +  public void addRow(int blockletId, int pageId, int rowId, CarbonRow row) {
    +    addRow(row.getData());
    +  }
     
    -    // for each row
    -    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
    -      // for each indexed column
    -      for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
    -        String indexedCol = indexedColumns.get(indexColId);
    -        byte[] indexValue;
    -        if (DataTypes.STRING == col2DataType.get(indexedCol)
    -            || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
    -          byte[] originValue = (byte[]) pages[col2Ordianl.get(indexedCol)].getData(rowId);
    -          indexValue = new byte[originValue.length - 2];
    -          System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
    -        } else {
    -          Object originValue = pages[col2Ordianl.get(indexedCol)].getData(rowId);
    -          indexValue = CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
    -        }
    -
    -        indexBloomFilters.get(indexColId).put(indexValue);
    +  protected void addRow(Object[] rowData) {
    +    // for each indexed column, add the data to bloom filter
    +    List<CarbonColumn> indexColumns = getIndexColumns();
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      DataType dataType = indexColumns.get(i).getDataType();
    +      byte[] indexValue;
    +      if (DataTypes.STRING == dataType) {
    --- End diff --
   
    The procedure for string and byte_array are the same, there is no need to distinguish them.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4658/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185394341
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    +      CarbonTable carbonTable) throws MalformedDataMapCommandException {
    +    List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +    Set<String> unique = new HashSet<>();
    +    for (CarbonColumn indexColumn : indexColumns) {
    +      unique.add(indexColumn.getColName());
    +    }
    +    if (unique.size() != indexColumns.size()) {
    +      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column");
    +    }
    +  }
    +
    +  public static String[] getIndexColumns(DataMapSchema dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    String columns = dataMapSchema.getProperties().get(INDEX_COLUMNS);
    +    if (columns == null) {
    +      columns = dataMapSchema.getProperties().get(INDEX_COLUMNS.toLowerCase());
    +    }
    +    if (columns == null) {
    +      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " DMPROPERTY is required");
    +    } else if (StringUtils.isBlank(columns)) {
    --- End diff --
   
    yes


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185394494
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    --- End diff --
   
    I assume you mean `private`, but it is needed to access in IndexDataMapProvider and lucene datamap factory


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185394539
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    +      CarbonTable carbonTable) throws MalformedDataMapCommandException {
    +    List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +    Set<String> unique = new HashSet<>();
    +    for (CarbonColumn indexColumn : indexColumns) {
    +      unique.add(indexColumn.getColName());
    +    }
    +    if (unique.size() != indexColumns.size()) {
    +      throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column");
    +    }
    +  }
    +
    +  public static String[] getIndexColumns(DataMapSchema dataMapSchema)
    --- End diff --
   
    I assume you mean private, but it is needed to access in create datamap command


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4411/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5572/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185401780
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java ---
    @@ -45,14 +51,40 @@
     public class DataMapRegistry {
       private static Map<String, String> shortNameToClassName = new ConcurrentHashMap<>();
     
    -  public static void registerDataMap(String datamapClassName, String shortName) {
    +  private static void registerDataMap(String datamapClassName, String shortName) {
         Objects.requireNonNull(datamapClassName);
         Objects.requireNonNull(shortName);
         shortNameToClassName.put(shortName, datamapClassName);
       }
     
    -  public static String getDataMapClassName(String shortName) {
    +  private static String getDataMapClassName(String shortName) {
         Objects.requireNonNull(shortName);
         return shortNameToClassName.get(shortName);
       }
    +
    +  public static DataMapFactory<? extends DataMap> getDataMapByShortName(
    --- End diff --
   
    The method name should be `getDataMapFactoryByShortName`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185402531
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---
    @@ -62,25 +63,31 @@ public void registerAllWriter(CarbonTable carbonTable, String segmentId,
         if (tableIndices != null) {
           for (TableDataMap tableDataMap : tableIndices) {
             DataMapFactory factory = tableDataMap.getDataMapFactory();
    -        register(factory, segmentId, dataWritePath);
    +        register(factory, segmentId, taskNo);
           }
         }
       }
     
       /**
        * Register a DataMapWriter
        */
    -  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
    +  private void register(DataMapFactory factory, String segmentId, String taskNo) {
         assert (factory != null);
         assert (segmentId != null);
         DataMapMeta meta = factory.getMeta();
         if (meta == null) {
           // if data map does not have meta, no need to register
           return;
         }
    -    List<String> columns = factory.getMeta().getIndexedColumns();
    +    List<CarbonColumn> columns = factory.getMeta().getIndexedColumns();
         List<DataMapWriter> writers = registry.get(columns);
    -    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
    +    DataMapWriter writer = null;
    +    try {
    +      writer = factory.createWriter(new Segment(segmentId), taskNo);
    +    } catch (IOException e) {
    +      LOG.error(e);
    --- End diff --
   
    Better to optimize the error message.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185403032
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---
    @@ -130,16 +137,26 @@ public void onBlockletEnd(int blockletId) {
        * @param tablePage  page data
        */
       public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException {
    -    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
    -    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
    -      List<String> indexedColumns = entry.getKey();
    +    Set<Map.Entry<List<CarbonColumn>, List<DataMapWriter>>> entries = registry.entrySet();
    +    for (Map.Entry<List<CarbonColumn>, List<DataMapWriter>> entry : entries) {
    +      List<CarbonColumn> indexedColumns = entry.getKey();
           ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
           for (int i = 0; i < indexedColumns.size(); i++) {
    -        pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
    +        pages[i] = tablePage.getColumnPage(indexedColumns.get(i).getColName());
           }
           List<DataMapWriter> writers = entry.getValue();
    -      for (DataMapWriter writer : writers) {
    -        writer.onPageAdded(blockletId, pageId, pages);
    +      int pageSize = pages[0].getPageSize();
    +
    +      // add every row in the page to writer
    +      for (int rowId = 0; rowId < pageSize; rowId++) {
    +        Object[] rowData = new Object[indexedColumns.size()];
    +        for (int k = 0; k < rowData.length; k++) {
    +          rowData[k] = pages[k].getData(rowId);
    +        }
    +        CarbonRow row = new CarbonRow(rowData);
    --- End diff --
   
    For Line152/156, why can‘t we use the row in the pages directly?
   
    If it's not, is there a better way to eliminate the generating of objects?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185400830
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -494,7 +524,7 @@ public static long getTaskIdFromTaskNo(String taskNo) {
         }
     
         /**
    -     * Return the batch number from taskNo string
    +     * Return the batch number from taskNo stringx
    --- End diff --
   
    `stringx`? Misstyping?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185402546
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---
    @@ -62,25 +63,31 @@ public void registerAllWriter(CarbonTable carbonTable, String segmentId,
         if (tableIndices != null) {
           for (TableDataMap tableDataMap : tableIndices) {
             DataMapFactory factory = tableDataMap.getDataMapFactory();
    -        register(factory, segmentId, dataWritePath);
    +        register(factory, segmentId, taskNo);
           }
         }
       }
     
       /**
        * Register a DataMapWriter
        */
    -  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
    +  private void register(DataMapFactory factory, String segmentId, String taskNo) {
         assert (factory != null);
         assert (segmentId != null);
         DataMapMeta meta = factory.getMeta();
         if (meta == null) {
           // if data map does not have meta, no need to register
           return;
         }
    -    List<String> columns = factory.getMeta().getIndexedColumns();
    +    List<CarbonColumn> columns = factory.getMeta().getIndexedColumns();
         List<DataMapWriter> writers = registry.get(columns);
    -    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
    +    DataMapWriter writer = null;
    +    try {
    +      writer = factory.createWriter(new Segment(segmentId), taskNo);
    +    } catch (IOException e) {
    +      LOG.error(e);
    +      throw new RuntimeException(e);
    --- End diff --
   
    As I mentioned above, will we customize an exception that extends RTE to wrap the exceptions may be thrown during DataMapWriter?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185400353
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    --- End diff --
   
    Oh, I thought it can be `protected` at that time.
   
    Actually these two methods do not use any variables in this class, so I think it's not proper for them to be here. Or just make them static?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    @jackylk Pls check the review comments.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4663/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2254: [CARBONDATA-2415] Support Refresh DataMap command fo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2254
 
    Inspired by the optimization of compaction (#2210), incremental building of datamap can also be optimized.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185427135
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
    --- End diff --
   
    Why init is required for abstract class, Better pass schema also to the constructor and remove this method.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185428212
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---
    @@ -27,74 +29,124 @@
     import org.apache.carbondata.core.features.TableOperation;
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
     import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.events.Event;
     
    +import org.apache.commons.lang.StringUtils;
    +
     /**
    - * Interface for datamap factory, it is responsible for creating the datamap.
    + * Interface for datamap of index type, it is responsible for creating the datamap.
      */
    -public interface DataMapFactory<T extends DataMap> {
    +public abstract class DataMapFactory<T extends DataMap> {
    +
    +  public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
    +  protected CarbonTable carbonTable;
    +
    +  public DataMapFactory(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
     
       /**
        * Initialization of Datamap factory with the carbonTable and datamap name
        */
    -  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +  public abstract void init(DataMapSchema dataMapSchema)
           throws IOException, MalformedDataMapCommandException;
     
       /**
        * Return a new write for this datamap
        */
    -  DataMapWriter createWriter(Segment segment, String writeDirectoryPath);
    +  public abstract DataMapWriter createWriter(Segment segment, String shardName)
    +      throws IOException;
    +
    +  public abstract DataMapRefresher createRefresher(Segment segment, String shardName)
    +      throws IOException;
     
       /**
        * Get the datamap for segmentid
        */
    -  List<T> getDataMaps(Segment segment) throws IOException;
    +  public abstract List<T> getDataMaps(Segment segment) throws IOException;
     
       /**
        * Get datamaps for distributable object.
        */
    -  List<T> getDataMaps(DataMapDistributable distributable)
    +  public abstract List<T> getDataMaps(DataMapDistributable distributable)
           throws IOException;
     
       /**
        * Get all distributable objects of a segmentid
        * @return
        */
    -  List<DataMapDistributable> toDistributable(Segment segment);
    +  public abstract List<DataMapDistributable> toDistributable(Segment segment);
     
       /**
        *
        * @param event
        */
    -  void fireEvent(Event event);
    +  public abstract void fireEvent(Event event);
     
       /**
        * Clears datamap of the segment
        */
    -  void clear(Segment segment);
    +  public abstract void clear(Segment segment);
     
       /**
        * Clear all datamaps from memory
        */
    -  void clear();
    +  public abstract void clear();
     
       /**
        * Return metadata of this datamap
        */
    -  DataMapMeta getMeta();
    +  public abstract DataMapMeta getMeta();
     
       /**
        *  Type of datamap whether it is FG or CG
        */
    -  DataMapLevel getDataMapType();
    +  public abstract DataMapLevel getDataMapLevel();
     
       /**
        * delete datamap data if any
        */
    -  void deleteDatamapData();
    +  public abstract void deleteDatamapData();
     
       /**
        * This function should return true is the input operation enum will make the datamap become stale
        */
    -  boolean willBecomeStale(TableOperation operation);
    +  public abstract boolean willBecomeStale(TableOperation operation);
    +
    +  /**
    +   * Validate INDEX_COLUMNS property and return a array containing index column name
    +   * Following will be validated
    +   * 1. require INDEX_COLUMNS property
    +   * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank)
    +   * 3. INDEX_COLUMNS can't contains duplicate same columns
    +   * 4. INDEX_COLUMNS should be exists in table columns
    +   */
    +  public void validateIndexedColumns(DataMapSchema dataMapSchema,
    --- End diff --
   
    I think it is not required to pass any arguments to it, both schema and table are already present here.
    And better rename the method to just `validate`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2254: [CARBONDATA-2415] Support Refresh DataMap com...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185429022
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java ---
    @@ -291,8 +295,8 @@ private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpre
         }
         boolean contains = true;
         for (ColumnExpression expression : columnExpressions) {
    -      if (!mapMeta.getIndexedColumns().contains(expression.getColumnName()) || !mapMeta
    -          .getOptimizedOperation().containsAll(expressionTypes)) {
    +      if (!mapMeta.getIndexedColumnNames().contains(expression.getColumnName()) ||
    +          !mapMeta.getOptimizedOperation().containsAll(expressionTypes)) {
    --- End diff --
   
    expressionTypes is from the FilterResolverIntf. So it is from the expression of the user query. See selectDataMap function


---
123456