[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

classic Classic list List threaded Threaded
62 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
GitHub user kevinjmh opened a pull request:

    https://github.com/apache/carbondata/pull/2624

    [CARBONDATA-2845][BloomDataMap] Merge bloom index files of multi-shards for each index column

    Currently a bloom index file will be generated per task per load, the query performance will be bad if we have many segments. Main datamap already faced this problem before.
    Here we want to merge the bloom index file in segment scope just like the main datamap do.
   
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kevinjmh/carbondata merge_bloomindex

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2624.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2624
   
----
commit 271143cb342b9528e5daccfc0c5af5a2397b0ece
Author: Manhua <kevinjmh@...>
Date:   2018-08-09T11:17:02Z

    merge bloom index

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6227/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7859/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6584/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6237/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7873/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6596/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6239/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7876/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6599/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6241/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7878/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6601/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom index fi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2624
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6270/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210214549
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.Partition
    +import org.apache.spark.rdd.CarbonMergeFilePartition
    +import org.apache.spark.SparkContext
    +import org.apache.spark.TaskContext
    +
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.datamap.bloom.BloomIndexFileStore
    +import org.apache.carbondata.spark.rdd.CarbonRDD
    +
    +
    +/**
    + * RDD to merge all bloomindex files of each segment for bloom datamap.
    --- End diff --
   
    `of each segment` => `of specified segment`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210211799
 
    --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---
    @@ -222,102 +220,95 @@ public DataMapBuilder createBuilder(Segment segment, String shardName,
     
       @Override
       public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
    -    List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
         try {
           Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
           if (shardPaths == null) {
    -        String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    -            getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
    -        CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    -        shardPaths = new HashSet<>();
    -        for (CarbonFile carbonFile : carbonFiles) {
    -          shardPaths.add(carbonFile.getAbsolutePath());
    -        }
    +        shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
             segmentMap.put(segment.getSegmentNo(), shardPaths);
           }
    +      Set<String> filteredShards = segment.getFilteredIndexShardNames();
           for (String shard : shardPaths) {
    -        BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
    -        bloomDM.init(new BloomDataMapModel(shard, cache));
    -        bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
    -        dataMaps.add(bloomDM);
    +        if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
    +            filteredShards.contains(new File(shard).getName())) {
    +          // Filter out the tasks which are filtered through Main datamap.
    +          // for merge shard, shard pruning delay to be done before pruning blocklet
    +          BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
    +          bloomDM.init(new BloomDataMapModel(shard, cache));
    +          bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
    +          bloomDM.setFilteredShard(filteredShards);
    +          dataMaps.add(bloomDM);
    +        }
           }
         } catch (Exception e) {
           throw new IOException("Error occurs while init Bloom DataMap", e);
         }
         return dataMaps;
       }
     
    -  @Override
    -  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
    -      throws IOException {
    -    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    -    BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
    -    String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
    -    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
    -    bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
    -        dataMapMeta.getIndexedColumns());
    -    coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
    -    return coarseGrainDataMaps;
    -  }
    -
       /**
    -   * returns all the directories of lucene index files for query
    -   * Note: copied from luceneDataMapFactory, will extract to a common interface
    +   * returns all shard directories of bloom index files for query
    +   * if bloom index files are merged we should get only one shard path
        */
    -  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
    -    List<CarbonFile> indexDirs = new ArrayList<>();
    -    List<TableDataMap> dataMaps;
    -    try {
    -      // there can be multiple bloom datamaps present on a table, so get all datamaps and form
    -      // the path till the index file directories in all datamaps folders present in each segment
    -      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    -    } catch (IOException ex) {
    -      LOGGER.error(ex, String.format("failed to get datamaps for tablePath %s, segmentId %s",
    -          tablePath, segmentId));
    -      throw new RuntimeException(ex);
    -    }
    -    if (dataMaps.size() > 0) {
    -      for (TableDataMap dataMap : dataMaps) {
    -        if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    -          List<CarbonFile> indexFiles;
    -          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
    -              dataMap.getDataMapSchema().getDataMapName());
    -          FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
    -          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
    -          indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
    -            @Override
    -            public boolean accept(CarbonFile file) {
    -              return file.isDirectory();
    -            }
    -          }));
    -          indexDirs.addAll(indexFiles);
    +  private Set<String> getAllShardPaths(String tablePath, String segmentId) {
    +    String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
    +            tablePath, segmentId, dataMapName);
    +    CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +    Set<String> shardPaths = new HashSet<>();
    +    boolean mergeShardInprogress = false;
    +    CarbonFile mergeShardFile = null;
    +    for (CarbonFile carbonFile : carbonFiles) {
    +      if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) {
    +        mergeShardFile = carbonFile;
    +      } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) {
    +        mergeShardInprogress = true;
    +      } else {
    +        if (carbonFile.isDirectory()) {
    +          shardPaths.add(carbonFile.getAbsolutePath());
             }
           }
         }
    -    return indexDirs.toArray(new CarbonFile[0]);
    +    if (mergeShardFile != null && !mergeShardInprogress) {
    +      //should only get one shard path if mergeShard is generated successfully
    --- End diff --
   
    need a space after `//` before comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210214357
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.Partition
    +import org.apache.spark.rdd.CarbonMergeFilePartition
    +import org.apache.spark.SparkContext
    +import org.apache.spark.TaskContext
    +
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.datamap.bloom.BloomIndexFileStore
    +import org.apache.carbondata.spark.rdd.CarbonRDD
    +
    +
    +/**
    + * RDD to merge all bloomindex files of each segment for bloom datamap.
    + *
    + * @param sc
    + * @param carbonTable
    + * @param segmentIds segments to be merged
    + * @param bloomDatamapNames list of bloom datamap
    + * @param bloomIndexColumns list of index columns correspond to datamap
    + */
    +class CarbonMergeBloomIndexFilesRDD(
    +  sc: SparkContext,
    +  carbonTable: CarbonTable,
    +  segmentIds: Seq[String],
    +  bloomDatamapNames: Seq[String],
    +  bloomIndexColumns: Seq[Seq[String]])
    +  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
    +
    +  override def getPartitions: Array[Partition] = {
    +    segmentIds.zipWithIndex.map {s =>
    +      CarbonMergeFilePartition(id, s._2, s._1)
    +    }.toArray
    +  }
    +
    +  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
    +    val tablePath = carbonTable.getTablePath
    +    val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
    +    logInfo("Merging bloom index files of segment : " + split.segmentId)
    --- End diff --
   
    s"Merging bloom index files of segment  ${SEG_ID} for ${TABLE}


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210212518
 
    --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.datamap.bloom;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.writer.ThriftWriter;
    +import org.apache.carbondata.format.MergedBloomIndex;
    +import org.apache.carbondata.format.MergedBloomIndexHeader;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.util.bloom.CarbonBloomFilter;
    +import org.apache.thrift.TBase;
    +
    +public class BloomIndexFileStore {
    --- End diff --
   
    Add InterfaceAudience.Internal annotation for this
    Add comment for this class toot


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210212700
 
    --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.datamap.bloom;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.writer.ThriftWriter;
    +import org.apache.carbondata.format.MergedBloomIndex;
    +import org.apache.carbondata.format.MergedBloomIndexHeader;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.util.bloom.CarbonBloomFilter;
    +import org.apache.thrift.TBase;
    +
    +public class BloomIndexFileStore {
    +
    +  private static final LogService LOGGER =
    +          LogServiceFactory.getLogService(BloomIndexFileStore.class.getName());
    +
    +  /*suffix of original generated file*/
    --- End diff --
   
    use `//` for one line comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2624: [CARBONDATA-2845][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210213202
 
    --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.datamap.bloom;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.writer.ThriftWriter;
    +import org.apache.carbondata.format.MergedBloomIndex;
    +import org.apache.carbondata.format.MergedBloomIndexHeader;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.util.bloom.CarbonBloomFilter;
    +import org.apache.thrift.TBase;
    +
    +public class BloomIndexFileStore {
    +
    +  private static final LogService LOGGER =
    +          LogServiceFactory.getLogService(BloomIndexFileStore.class.getName());
    +
    +  /*suffix of original generated file*/
    +  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
    +  /*suffix of merged bloom index file*/
    +  public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge";
    +  /* directory to store merged bloom index files */
    +  public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard";
    +  /**
    +   * flag file for merging
    +   * if flag file exists, query won't use mergeShard
    +   * if flag file not exists and mergeShard generated, query will use mergeShard
    +   */
    +  public static final String MERGE_INPROGRESS_FILE = "mergeShard.inprogress";
    +
    +
    +  public static void mergeBloomIndexFile(String dmSegmentPathString, List<String> indexCols) {
    +    // get all shard paths of old store
    +    CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
    +            FileFactory.getFileType(dmSegmentPathString));
    +    CarbonFile[] shardPaths = segmentPath.listFiles(new CarbonFileFilter() {
    +      @Override
    +      public boolean accept(CarbonFile file) {
    +        return file.isDirectory() && !file.getName().equals(MERGE_BLOOM_INDEX_SHARD_NAME);
    +      }
    +    });
    +
    +    String mergeShardPath = dmSegmentPathString + File.separator + MERGE_BLOOM_INDEX_SHARD_NAME;
    +    String mergeInprofressFile = dmSegmentPathString + File.separator + MERGE_INPROGRESS_FILE;
    +    try {
    +      // delete mergeShard folder if exists
    +      if (FileFactory.isFileExist(mergeShardPath)) {
    +        FileFactory.deleteFile(mergeShardPath, FileFactory.getFileType(mergeShardPath));
    +      }
    +      // create flag file before creating mergeShard folder
    +      if (!FileFactory.isFileExist(mergeInprofressFile)) {
    +        FileFactory.createNewFile(
    +            mergeInprofressFile, FileFactory.getFileType(mergeInprofressFile));
    +      }
    +      // prepare mergeShard output folder
    +      if (!FileFactory.mkdirs(mergeShardPath, FileFactory.getFileType(mergeShardPath))) {
    +        throw new RuntimeException("Failed to create directory " + mergeShardPath);
    +      }
    +    } catch (IOException e) {
    +      LOGGER.error(e, "Error occurs while create directory " + mergeShardPath);
    +      throw new RuntimeException("Error occurs while create directory " + mergeShardPath);
    +    }
    +
    +    // for each index column, merge the bloomindex files from all shards into one
    +    for (String indexCol: indexCols) {
    +      MergedBloomIndexHeader indexHeader = new MergedBloomIndexHeader();
    +      MergedBloomIndex mergedBloomIndex = new MergedBloomIndex();
    +      List<String> shardNames = new ArrayList<>();
    +      List<ByteBuffer> data = new ArrayList<>();
    +      try {
    +        for (CarbonFile shardPath : shardPaths) {
    +          String bloomIndexFile = getBloomIndexFile(shardPath.getCanonicalPath(), indexCol);
    +          DataInputStream dataInputStream = FileFactory.getDataInputStream(
    +                  bloomIndexFile, FileFactory.getFileType(bloomIndexFile));
    +          byte[] bytes = new byte[(int) FileFactory.getCarbonFile(bloomIndexFile).getSize()];
    +          try {
    +            dataInputStream.readFully(bytes);
    +            shardNames.add(shardPath.getName());
    +            data.add(ByteBuffer.wrap(bytes));
    +          } finally {
    +            dataInputStream.close();
    +          }
    +        }
    +        indexHeader.setShard_names(shardNames);
    +        mergedBloomIndex.setFileData(data);
    +        // write segment level file
    +        String mergeIndexFileName = getMergeBloomIndexFile(mergeShardPath, indexCol);
    +        ThriftWriter thriftWriter = new ThriftWriter(mergeIndexFileName, false);
    +        thriftWriter.open(FileWriteOperation.OVERWRITE);
    +        thriftWriter.write(indexHeader);
    +        thriftWriter.write(mergedBloomIndex);
    +        thriftWriter.close();
    +      } catch (IOException e) {
    +        LOGGER.error(e, "Error occurs while merge bloom index file of column: " + indexCol);
    +        // delete merge shard of bloom index for this segment when failed
    +        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath));
    +        throw new RuntimeException(
    +                "Error occurs while merge bloom index file of column: " + indexCol);
    +      }
    +    }
    +    // delete flag file and mergeShard can be used
    +    try {
    +      FileFactory.deleteFile(mergeInprofressFile, FileFactory.getFileType(mergeInprofressFile));
    +    } catch (IOException e) {
    +      LOGGER.error(e, "Error occurs while deleting file " + mergeInprofressFile);
    +      throw new RuntimeException("Error occurs while deleting file " + mergeInprofressFile);
    +    }
    +    // remove old store
    +    for (CarbonFile shardpath: shardPaths) {
    +      FileFactory.deleteAllCarbonFilesOfDir(shardpath);
    +    }
    +  }
    +
    +  /**
    +   * load bloom filter from bloom index file
    +   * @param shardPath
    --- End diff --
   
    remove these useless lines if not provide more information


---
1234