[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap

classic Classic list List threaded Threaded
48 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap

qiuchenjian-2
GitHub user xuchuanyin opened a pull request:

    https://github.com/apache/carbondata/pull/2963

    [CARBONDATA-3139] Fix bugs in MinMaxDataMap

    make minmax datamap usable and add more tests for it.
    MinMax DataMap may be useful if we want to implement datamap for external file format like CSV/Parquet etc.
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuchuanyin/carbondata 181125_bug_minmax_dm

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2963.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2963
   
----
commit cad25a9a8994a19d306ed3c53b7fd0aaf58a5811
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-11-25T13:36:17Z

    Fix bugs in MinMaxDataMap
   
    make minmax datamap usable and add more tests for it

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
Github user chenliang613 commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Can consider writing an example:how to use MinMaxDataMap to build index for CSV file.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1573/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1785/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9832/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    > Can consider writing an example:how to use MinMaxDataMap to build index for CSV file.
   
    @chenliang613 This will requires carbondata support external file format (such as CSV) in loading and reading. Then we can simply use this minmax datamap as well as bloomfilter datamap as filelevel datamap for these formats.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r237452138
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    --- End diff --
   
    DataTypes.BYTE  Ã¯Â¼ÂŸ


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user Indhumathi27 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r238152299
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((short) value);
    +      } else if (DataTypes.INT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((int) value);
    +      } else if (DataTypes.LONG == dataType || DataTypes.TIMESTAMP == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((long) value);
    +      } else if (DataTypes.DOUBLE == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((double) value);
    +      } else if (DataTypes.isDecimal(dataType)) {
    --- End diff --
   
    Please handle for DataTypes.FLOAT datatype also


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r238153188
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((short) value);
    +      } else if (DataTypes.INT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((int) value);
    +      } else if (DataTypes.LONG == dataType || DataTypes.TIMESTAMP == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((long) value);
    +      } else if (DataTypes.DOUBLE == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((double) value);
    +      } else if (DataTypes.isDecimal(dataType)) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((BigDecimal) value);
    +      } else {
    +        throw new UnsupportedOperationException("unsupported data type " + dataType);
    +      }
    +    } else {
    +      // While pruning for query, we want to reuse the pruning method from carbon, so here for
    +      // dictionary columns, we need to store the mdk value in the minmax index.
    +      // For direct generating, the input value is already MDK; For late building, the input value
    +      // is surrogate key, so we need to handle it here.
    +      if (indexCol.hasEncoding(Encoding.DICTIONARY)
    +          || indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(convertDictValueToMdk(indexColIdx, value));
    +      } else {
    +        byte[] plainValue = convertNonDicValueToPlain(indexColIdx, (byte[]) value);
    +        indexColumnMinMaxCollectors[indexColIdx].update(plainValue);
    +      }
    +    }
    +  }
    +
    +  protected abstract byte[] convertDictValueToMdk(int indexColIdx, Object value);
    +
    +  protected abstract byte[] convertNonDicValueToPlain(int indexColIdx, byte[] value);
    +
    +  private void logMinMaxInfo(int indexColId) {
    +    CarbonColumn indexCol = indexColumns.get(indexColId);
    +    StringBuilder sb = new StringBuilder("flush blockletId->").append(currentBlockletId)
    +        .append(", column->").append(indexCol.getColName())
    +        .append(", dataType->").append(indexCol.getDataType().getName());
    +    Object min = indexColumnMinMaxCollectors[indexColId].getPageStats().getMin();
    +    Object max = indexColumnMinMaxCollectors[indexColId].getPageStats().getMax();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      sb.append(", min->").append(min)
    +          .append(", max->").append(max);
    +    } else {
    +      sb.append(", min->")
    +          .append(new String((byte[]) min, CarbonCommonConstants.DEFAULT_CHARSET_CLASS))
    +          .append(", max->")
    +          .append(new String((byte[]) max, CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
    +
    +    }
    +    LOGGER.debug(sb.toString());
    --- End diff --
   
    Please add a check for isDebugEnabled() wherever debug logs added


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r238153927
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java ---
    @@ -0,0 +1,365 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.cache.Cache;
    +import org.apache.carbondata.core.cache.CacheProvider;
    +import org.apache.carbondata.core.cache.CacheType;
    +import org.apache.carbondata.core.datamap.DataMapDistributable;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.DataMapMeta;
    +import org.apache.carbondata.core.datamap.DataMapStoreManager;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.TableDataMap;
    +import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.features.TableOperation;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.events.Event;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * Min Max DataMap Factory
    + */
    +@InterfaceAudience.Internal
    +public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory {
    +  private static final Logger LOGGER =
    +      LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName());
    +  private DataMapMeta dataMapMeta;
    +  private String dataMapName;
    +  // segmentId -> list of index files
    +  private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
    +  private Cache<MinMaxDataMapCacheKeyValue.Key, MinMaxDataMapCacheKeyValue.Value> cache;
    +
    +  public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    super(carbonTable, dataMapSchema);
    +
    +    // this is an example for datamap, we can choose the columns and operations that
    +    // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
    +
    +    this.dataMapName = dataMapSchema.getDataMapName();
    +    List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +
    +    // operations that will be supported on the indexed columns
    +    List<ExpressionType> optOperations = new ArrayList<>();
    +    optOperations.add(ExpressionType.NOT);
    +    optOperations.add(ExpressionType.EQUALS);
    +    optOperations.add(ExpressionType.NOT_EQUALS);
    +    optOperations.add(ExpressionType.GREATERTHAN);
    +    optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.LESSTHAN);
    +    optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.IN);
    +    this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations);
    +
    +    // init cache. note that the createCache ensures the singleton of the cache
    +    try {
    +      this.cache = CacheProvider.getInstance()
    +          .createCache(new CacheType("minmax_cache"), MinMaxDataMapCache.class.getName());
    +    } catch (Exception e) {
    +      LOGGER.error("Failed to create cache for minmax datamap", e);
    +      throw new MalformedDataMapCommandException(e.getMessage());
    +    }
    +  }
    +
    +  /**
    +   * createWriter will return the MinMaxDataWriter.
    +   *
    +   * @param segment
    +   * @param shardName
    +   * @return
    +   */
    +  @Override
    +  public DataMapWriter createWriter(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    if (LOGGER.isDebugEnabled()) {
    +      LOGGER.debug(String.format(
    +          "Data of MinMaxDataMap %s for table %s will be written to %s",
    +          dataMapName, getCarbonTable().getTableName(), shardName));
    +    }
    +    return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  @Override
    +  public DataMapBuilder createBuilder(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  /**
    +   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    +   *
    +   * @param segment
    +   * @return
    +   * @throws IOException
    +   */
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
    +    Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
    +    if (shardPaths == null) {
    +      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    +          getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
    +      CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +      shardPaths = new HashSet<>();
    +      for (CarbonFile carbonFile : carbonFiles) {
    +        shardPaths.add(carbonFile.getAbsolutePath());
    +      }
    +      segmentMap.put(segment.getSegmentNo(), shardPaths);
    +    }
    +
    +    for (String shard : shardPaths) {
    +      MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
    +      dataMap.init(new MinMaxDataMapModel(shard, cache, segment.getConfiguration()));
    +      dataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +      dataMaps.add(dataMap);
    +    }
    +    return dataMaps;
    +  }
    +
    +  @Override
    +  public DataMapMeta getMeta() {
    +    return this.dataMapMeta;
    +  }
    +
    +  @Override
    +  public DataMapLevel getDataMapLevel() {
    +    return DataMapLevel.CG;
    +  }
    +
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
    +      throws IOException {
    +    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    +    MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap();
    +    String indexPath = ((MinMaxDataMapDistributable) distributable).getIndexPath();
    +    minMaxIndexDataMap.init(
    +        new MinMaxDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
    +    minMaxIndexDataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +    coarseGrainDataMaps.add(minMaxIndexDataMap);
    +    return coarseGrainDataMaps;
    +  }
    +
    +  /**
    +   * returns all the directories of lucene index files for query
    +   * Note: copied from BloomFilterDataMapFactory, will extract to a common interface
    +   */
    +  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
    +    List<CarbonFile> indexDirs = new ArrayList<>();
    +    List<TableDataMap> dataMaps;
    +    try {
    +      // there can be multiple bloom datamaps present on a table, so get all datamaps and form
    +      // the path till the index file directories in all datamaps folders present in each segment
    +      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    +    } catch (IOException ex) {
    +      LOGGER.error(String.format(
    +          "failed to get datamaps for tablePath %s, segmentId %s", tablePath, segmentId), ex);
    +      throw new RuntimeException(ex);
    +    }
    +    if (dataMaps.size() > 0) {
    +      for (TableDataMap dataMap : dataMaps) {
    +        if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    +          List<CarbonFile> indexFiles;
    +          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
    +              dataMap.getDataMapSchema().getDataMapName());
    +          FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
    +          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
    --- End diff --
   
    please use org.apache.carbondata.core.datastore.impl.FileFactory#getCarbonFile(java.lang.String) this method. This method internally takes the fileType also.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r238154502
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java ---
    @@ -0,0 +1,365 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.cache.Cache;
    +import org.apache.carbondata.core.cache.CacheProvider;
    +import org.apache.carbondata.core.cache.CacheType;
    +import org.apache.carbondata.core.datamap.DataMapDistributable;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.DataMapMeta;
    +import org.apache.carbondata.core.datamap.DataMapStoreManager;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.TableDataMap;
    +import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.features.TableOperation;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.events.Event;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * Min Max DataMap Factory
    + */
    +@InterfaceAudience.Internal
    +public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory {
    +  private static final Logger LOGGER =
    +      LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName());
    +  private DataMapMeta dataMapMeta;
    +  private String dataMapName;
    +  // segmentId -> list of index files
    +  private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
    +  private Cache<MinMaxDataMapCacheKeyValue.Key, MinMaxDataMapCacheKeyValue.Value> cache;
    +
    +  public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    super(carbonTable, dataMapSchema);
    +
    +    // this is an example for datamap, we can choose the columns and operations that
    +    // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
    +
    +    this.dataMapName = dataMapSchema.getDataMapName();
    +    List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +
    +    // operations that will be supported on the indexed columns
    +    List<ExpressionType> optOperations = new ArrayList<>();
    +    optOperations.add(ExpressionType.NOT);
    +    optOperations.add(ExpressionType.EQUALS);
    +    optOperations.add(ExpressionType.NOT_EQUALS);
    +    optOperations.add(ExpressionType.GREATERTHAN);
    +    optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.LESSTHAN);
    +    optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.IN);
    +    this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations);
    +
    +    // init cache. note that the createCache ensures the singleton of the cache
    +    try {
    +      this.cache = CacheProvider.getInstance()
    +          .createCache(new CacheType("minmax_cache"), MinMaxDataMapCache.class.getName());
    +    } catch (Exception e) {
    +      LOGGER.error("Failed to create cache for minmax datamap", e);
    +      throw new MalformedDataMapCommandException(e.getMessage());
    +    }
    +  }
    +
    +  /**
    +   * createWriter will return the MinMaxDataWriter.
    +   *
    +   * @param segment
    +   * @param shardName
    +   * @return
    +   */
    +  @Override
    +  public DataMapWriter createWriter(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    if (LOGGER.isDebugEnabled()) {
    +      LOGGER.debug(String.format(
    +          "Data of MinMaxDataMap %s for table %s will be written to %s",
    +          dataMapName, getCarbonTable().getTableName(), shardName));
    +    }
    +    return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  @Override
    +  public DataMapBuilder createBuilder(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  /**
    +   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    +   *
    +   * @param segment
    +   * @return
    +   * @throws IOException
    +   */
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
    +    Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
    +    if (shardPaths == null) {
    +      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    +          getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
    +      CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +      shardPaths = new HashSet<>();
    +      for (CarbonFile carbonFile : carbonFiles) {
    +        shardPaths.add(carbonFile.getAbsolutePath());
    +      }
    +      segmentMap.put(segment.getSegmentNo(), shardPaths);
    +    }
    +
    +    for (String shard : shardPaths) {
    +      MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
    +      dataMap.init(new MinMaxDataMapModel(shard, cache, segment.getConfiguration()));
    +      dataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +      dataMaps.add(dataMap);
    +    }
    +    return dataMaps;
    +  }
    +
    +  @Override
    +  public DataMapMeta getMeta() {
    +    return this.dataMapMeta;
    +  }
    +
    +  @Override
    +  public DataMapLevel getDataMapLevel() {
    +    return DataMapLevel.CG;
    +  }
    +
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
    +      throws IOException {
    +    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    +    MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap();
    +    String indexPath = ((MinMaxDataMapDistributable) distributable).getIndexPath();
    +    minMaxIndexDataMap.init(
    +        new MinMaxDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
    +    minMaxIndexDataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +    coarseGrainDataMaps.add(minMaxIndexDataMap);
    +    return coarseGrainDataMaps;
    +  }
    +
    +  /**
    +   * returns all the directories of lucene index files for query
    +   * Note: copied from BloomFilterDataMapFactory, will extract to a common interface
    +   */
    +  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
    +    List<CarbonFile> indexDirs = new ArrayList<>();
    +    List<TableDataMap> dataMaps;
    +    try {
    +      // there can be multiple bloom datamaps present on a table, so get all datamaps and form
    +      // the path till the index file directories in all datamaps folders present in each segment
    +      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    +    } catch (IOException ex) {
    +      LOGGER.error(String.format(
    +          "failed to get datamaps for tablePath %s, segmentId %s", tablePath, segmentId), ex);
    +      throw new RuntimeException(ex);
    +    }
    +    if (dataMaps.size() > 0) {
    +      for (TableDataMap dataMap : dataMaps) {
    +        if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    +          List<CarbonFile> indexFiles;
    +          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
    +              dataMap.getDataMapSchema().getDataMapName());
    +          FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
    +          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
    +          indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
    +            @Override
    +            public boolean accept(CarbonFile file) {
    +              return file.isDirectory();
    +            }
    +          }));
    +          indexDirs.addAll(indexFiles);
    +        }
    +      }
    +    }
    +    return indexDirs.toArray(new CarbonFile[0]);
    +  }
    +
    +  @Override
    +  public List<DataMapDistributable> toDistributable(Segment segment) {
    +    List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
    +    CarbonFile[] indexDirs =
    +        getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo());
    +    if (segment.getFilteredIndexShardNames().size() == 0) {
    +      for (CarbonFile indexDir : indexDirs) {
    +        DataMapDistributable bloomDataMapDistributable =
    +            new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
    +        dataMapDistributableList.add(bloomDataMapDistributable);
    +      }
    +      return dataMapDistributableList;
    +    }
    +    for (CarbonFile indexDir : indexDirs) {
    +      // Filter out the tasks which are filtered through CG datamap.
    +      if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
    +        continue;
    +      }
    +      DataMapDistributable bloomDataMapDistributable =
    +          new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
    +      dataMapDistributableList.add(bloomDataMapDistributable);
    +    }
    +    return dataMapDistributableList;
    +  }
    +
    +  @Override
    +  public void fireEvent(Event event) {
    +
    +  }
    +
    +  @Override
    +  public void clear(Segment segment) {
    +    Set<String> shards = segmentMap.remove(segment.getSegmentNo());
    +    if (null != shards) {
    +      for (String shard : shards) {
    +        cache.invalidate(new MinMaxDataMapCacheKeyValue.Key(shard));
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public synchronized void clear() {
    +    if (segmentMap.size() > 0) {
    +      List<String> segments = new ArrayList<>(segmentMap.keySet());
    +      for (String segmentId : segments) {
    +        clear(new Segment(segmentId, null, null));
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void deleteDatamapData(Segment segment) throws IOException {
    +    try {
    +      String segmentId = segment.getSegmentNo();
    +      String datamapPath = CarbonTablePath
    +          .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName);
    +      if (FileFactory.isFileExist(datamapPath)) {
    +        CarbonFile file =
    +            FileFactory.getCarbonFile(datamapPath, FileFactory.getFileType(datamapPath));
    +        CarbonUtil.deleteFoldersAndFilesSilent(file);
    +      }
    +    } catch (InterruptedException ex) {
    +      throw new IOException("Failed to delete datamap for segment_" + segment.getSegmentNo());
    +    }
    +  }
    +
    +  @Override
    +  public void deleteDatamapData() {
    +    SegmentStatusManager ssm =
    +        new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
    +    try {
    +      List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
    +      for (Segment segment : validSegments) {
    +        deleteDatamapData(segment);
    +      }
    +    } catch (IOException e) {
    +      LOGGER.error("drop datamap failed, failed to delete datamap directory");
    +    }
    +  }
    +
    +  @Override
    +  public boolean willBecomeStale(TableOperation operation) {
    +    switch (operation) {
    +      case ALTER_RENAME:
    --- End diff --
   
    all true cases can be grouped and all false cases can be grouped for improved readability


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user Indhumathi27 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r238156188
 
    --- Diff: integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapFunctionSuite.scala ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax
    +
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +
    +class MinMaxDataMapFunctionSuite extends QueryTest with BeforeAndAfterAll {
    +  private val minmaxDataMapFactoryName = "org.apache.carbondata.datamap.minmax.MinMaxDataMapFactory"
    +  var originalStatEnabled = CarbonProperties.getInstance().getProperty(
    +    CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
    +    CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
    +
    +  override protected def beforeAll(): Unit = {
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
    +      "yyyy-MM-dd")
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
    +      "yyyy-MM-dd HH:mm:ss")
    --- End diff --
   
    Please check whether we can use the default timestamp/date format here.
    org.apache.carbondata.core.constants.CarbonCommonConstants#CARBON_TIMESTAMP_DEFAULT_FORMAT


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r240578382
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((short) value);
    +      } else if (DataTypes.INT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((int) value);
    +      } else if (DataTypes.LONG == dataType || DataTypes.TIMESTAMP == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((long) value);
    +      } else if (DataTypes.DOUBLE == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((double) value);
    +      } else if (DataTypes.isDecimal(dataType)) {
    --- End diff --
   
    OK


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r240579236
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    --- End diff --
   
    OK


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r240579947
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((short) value);
    +      } else if (DataTypes.INT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((int) value);
    +      } else if (DataTypes.LONG == dataType || DataTypes.TIMESTAMP == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((long) value);
    +      } else if (DataTypes.DOUBLE == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((double) value);
    +      } else if (DataTypes.isDecimal(dataType)) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((BigDecimal) value);
    +      } else {
    +        throw new UnsupportedOperationException("unsupported data type " + dataType);
    +      }
    +    } else {
    +      // While pruning for query, we want to reuse the pruning method from carbon, so here for
    +      // dictionary columns, we need to store the mdk value in the minmax index.
    +      // For direct generating, the input value is already MDK; For late building, the input value
    +      // is surrogate key, so we need to handle it here.
    +      if (indexCol.hasEncoding(Encoding.DICTIONARY)
    +          || indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(convertDictValueToMdk(indexColIdx, value));
    +      } else {
    +        byte[] plainValue = convertNonDicValueToPlain(indexColIdx, (byte[]) value);
    +        indexColumnMinMaxCollectors[indexColIdx].update(plainValue);
    +      }
    +    }
    +  }
    +
    +  protected abstract byte[] convertDictValueToMdk(int indexColIdx, Object value);
    +
    +  protected abstract byte[] convertNonDicValueToPlain(int indexColIdx, byte[] value);
    +
    +  private void logMinMaxInfo(int indexColId) {
    +    CarbonColumn indexCol = indexColumns.get(indexColId);
    +    StringBuilder sb = new StringBuilder("flush blockletId->").append(currentBlockletId)
    +        .append(", column->").append(indexCol.getColName())
    +        .append(", dataType->").append(indexCol.getDataType().getName());
    +    Object min = indexColumnMinMaxCollectors[indexColId].getPageStats().getMin();
    +    Object max = indexColumnMinMaxCollectors[indexColId].getPageStats().getMax();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      sb.append(", min->").append(min)
    +          .append(", max->").append(max);
    +    } else {
    +      sb.append(", min->")
    +          .append(new String((byte[]) min, CarbonCommonConstants.DEFAULT_CHARSET_CLASS))
    +          .append(", max->")
    +          .append(new String((byte[]) max, CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
    +
    +    }
    +    LOGGER.debug(sb.toString());
    --- End diff --
   
    yeah, actually this whole method `logMinMaxInfo` is already guarded by the check `isDebugEnabled`, so we can skip the check here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r240580992
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java ---
    @@ -0,0 +1,365 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.cache.Cache;
    +import org.apache.carbondata.core.cache.CacheProvider;
    +import org.apache.carbondata.core.cache.CacheType;
    +import org.apache.carbondata.core.datamap.DataMapDistributable;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.DataMapMeta;
    +import org.apache.carbondata.core.datamap.DataMapStoreManager;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.TableDataMap;
    +import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.features.TableOperation;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.events.Event;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * Min Max DataMap Factory
    + */
    +@InterfaceAudience.Internal
    +public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory {
    +  private static final Logger LOGGER =
    +      LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName());
    +  private DataMapMeta dataMapMeta;
    +  private String dataMapName;
    +  // segmentId -> list of index files
    +  private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
    +  private Cache<MinMaxDataMapCacheKeyValue.Key, MinMaxDataMapCacheKeyValue.Value> cache;
    +
    +  public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
    +      throws MalformedDataMapCommandException {
    +    super(carbonTable, dataMapSchema);
    +
    +    // this is an example for datamap, we can choose the columns and operations that
    +    // will be supported by this datamap. Furthermore, we can add cache-support for this datamap.
    +
    +    this.dataMapName = dataMapSchema.getDataMapName();
    +    List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
    +
    +    // operations that will be supported on the indexed columns
    +    List<ExpressionType> optOperations = new ArrayList<>();
    +    optOperations.add(ExpressionType.NOT);
    +    optOperations.add(ExpressionType.EQUALS);
    +    optOperations.add(ExpressionType.NOT_EQUALS);
    +    optOperations.add(ExpressionType.GREATERTHAN);
    +    optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.LESSTHAN);
    +    optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
    +    optOperations.add(ExpressionType.IN);
    +    this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations);
    +
    +    // init cache. note that the createCache ensures the singleton of the cache
    +    try {
    +      this.cache = CacheProvider.getInstance()
    +          .createCache(new CacheType("minmax_cache"), MinMaxDataMapCache.class.getName());
    +    } catch (Exception e) {
    +      LOGGER.error("Failed to create cache for minmax datamap", e);
    +      throw new MalformedDataMapCommandException(e.getMessage());
    +    }
    +  }
    +
    +  /**
    +   * createWriter will return the MinMaxDataWriter.
    +   *
    +   * @param segment
    +   * @param shardName
    +   * @return
    +   */
    +  @Override
    +  public DataMapWriter createWriter(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    if (LOGGER.isDebugEnabled()) {
    +      LOGGER.debug(String.format(
    +          "Data of MinMaxDataMap %s for table %s will be written to %s",
    +          dataMapName, getCarbonTable().getTableName(), shardName));
    +    }
    +    return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  @Override
    +  public DataMapBuilder createBuilder(Segment segment, String shardName,
    +      SegmentProperties segmentProperties) throws IOException {
    +    return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
    +        dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties);
    +  }
    +
    +  /**
    +   * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    +   *
    +   * @param segment
    +   * @return
    +   * @throws IOException
    +   */
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
    +    Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
    +    if (shardPaths == null) {
    +      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    +          getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
    +      CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +      shardPaths = new HashSet<>();
    +      for (CarbonFile carbonFile : carbonFiles) {
    +        shardPaths.add(carbonFile.getAbsolutePath());
    +      }
    +      segmentMap.put(segment.getSegmentNo(), shardPaths);
    +    }
    +
    +    for (String shard : shardPaths) {
    +      MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
    +      dataMap.init(new MinMaxDataMapModel(shard, cache, segment.getConfiguration()));
    +      dataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +      dataMaps.add(dataMap);
    +    }
    +    return dataMaps;
    +  }
    +
    +  @Override
    +  public DataMapMeta getMeta() {
    +    return this.dataMapMeta;
    +  }
    +
    +  @Override
    +  public DataMapLevel getDataMapLevel() {
    +    return DataMapLevel.CG;
    +  }
    +
    +  @Override
    +  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
    +      throws IOException {
    +    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    +    MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap();
    +    String indexPath = ((MinMaxDataMapDistributable) distributable).getIndexPath();
    +    minMaxIndexDataMap.init(
    +        new MinMaxDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
    +    minMaxIndexDataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +    coarseGrainDataMaps.add(minMaxIndexDataMap);
    +    return coarseGrainDataMaps;
    +  }
    +
    +  /**
    +   * returns all the directories of lucene index files for query
    +   * Note: copied from BloomFilterDataMapFactory, will extract to a common interface
    +   */
    +  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
    +    List<CarbonFile> indexDirs = new ArrayList<>();
    +    List<TableDataMap> dataMaps;
    +    try {
    +      // there can be multiple bloom datamaps present on a table, so get all datamaps and form
    +      // the path till the index file directories in all datamaps folders present in each segment
    +      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    +    } catch (IOException ex) {
    +      LOGGER.error(String.format(
    +          "failed to get datamaps for tablePath %s, segmentId %s", tablePath, segmentId), ex);
    +      throw new RuntimeException(ex);
    +    }
    +    if (dataMaps.size() > 0) {
    +      for (TableDataMap dataMap : dataMaps) {
    +        if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    +          List<CarbonFile> indexFiles;
    +          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
    +              dataMap.getDataMapSchema().getDataMapName());
    +          FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
    +          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
    --- End diff --
   
    OK. And we can mark this method as deprecated later


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r240581131
 
    --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each blocklet.
    + * Since the size of index is quite small, we will combine the index for all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    --- End diff --
   
    I mean Byte type should be considered


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap e...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r240581684
 
    --- Diff: integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapFunctionSuite.scala ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax
    +
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +
    +class MinMaxDataMapFunctionSuite extends QueryTest with BeforeAndAfterAll {
    +  private val minmaxDataMapFactoryName = "org.apache.carbondata.datamap.minmax.MinMaxDataMapFactory"
    +  var originalStatEnabled = CarbonProperties.getInstance().getProperty(
    +    CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
    +    CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
    +
    +  override protected def beforeAll(): Unit = {
    +    CarbonProperties.getInstance()
    +      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
    +      "yyyy-MM-dd")
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
    +      "yyyy-MM-dd HH:mm:ss")
    --- End diff --
   
    I think this modification is OK.
    We explicitly specify the format here to indicate that this is just the format of our input data. (I'm afraid the default behavior will change later)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1703/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2963: [CARBONDATA-3139] Fix bugs in MinMaxDataMap example

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2963
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1704/



---
123