[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

classic Classic list List threaded Threaded
55 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r184914643
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---
    @@ -277,4 +298,30 @@ public DataMapMeta getMeta() {
         }
         return blocklets;
       }
    +
    +  @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
    +      BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException {
    +    cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper);
    +  }
    +
    +  @Override
    +  public List<DataMapDistributable> getAllUncachedDistributables(
    +      List<DataMapDistributable> distributables) throws IOException {
    +    List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size());
    +    for (DataMapDistributable distributable : distributables) {
    +      Segment segment = distributable.getSegment();
    +      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
    +          getTableBlockIndexUniqueIdentifiers(segment);
    +      // filter out the tableBlockIndexUniqueIdentifiers based on distributable
    +      TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
    +          .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
    +              (BlockletDataMapDistributable) distributable);
    +      ((BlockletDataMapDistributable) distributable)
    +          .setTableBlockIndexUniqueIdentifier(validIdentifier);
    --- End diff --
   
    Move this line inside below if check


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r184915870
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.BlockMetaInfo;
    +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
    +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +
    +public class BlockletDataMapUtil {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockletDataMapUtil.class.getName());
    +
    +  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
    +      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
    +      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
    +      throws IOException {
    +    if (identifier.getMergeIndexFileName() != null
    +        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getMergeIndexFileName());
    +      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
    +        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
    +        filesRead.add(indexMergeFile.getPath());
    +      }
    +    }
    +    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getIndexFileName()) });
    +    }
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
    +    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
    +        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
    +    for (DataFileFooter footer : indexInfo) {
    +      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
    +      if (FileFactory.isFileExist(blockPath)) {
    --- End diff --
   
    Is this if check required?.. It will make a namenode call for every blocklet entry here so better to remove this


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4415/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185435135
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.BlockMetaInfo;
    +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
    +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +
    +public class BlockletDataMapUtil {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockletDataMapUtil.class.getName());
    +
    +  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
    +      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
    +      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
    +      throws IOException {
    +    if (identifier.getMergeIndexFileName() != null
    +        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getMergeIndexFileName());
    +      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
    +        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
    +        filesRead.add(indexMergeFile.getPath());
    +      }
    +    }
    +    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getIndexFileName()) });
    +    }
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
    +    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
    +        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
    +    for (DataFileFooter footer : indexInfo) {
    +      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
    +      if (FileFactory.isFileExist(blockPath)) {
    --- End diff --
   
    This check is required in case of update/delete operation.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5576/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185484269
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.BlockMetaInfo;
    +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
    +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +
    +public class BlockletDataMapUtil {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockletDataMapUtil.class.getName());
    +
    +  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
    +      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
    +      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
    +      throws IOException {
    +    if (identifier.getMergeIndexFileName() != null
    +        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getMergeIndexFileName());
    +      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
    +        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
    +        filesRead.add(indexMergeFile.getPath());
    +      }
    +    }
    +    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getIndexFileName()) });
    +    }
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
    +    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
    +        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
    +    for (DataFileFooter footer : indexInfo) {
    +      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
    +      if (FileFactory.isFileExist(blockPath)) {
    --- End diff --
   
    fileNameToMetaInfoMapping will contain list of all the existing carbondata files. I think u can remove this namenode call and make use of this map to check whether a file physically exists or not


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4427/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5588/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4683/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185794118
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---
    @@ -159,16 +159,18 @@
           FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
         }
         for (Blocklet blocklet : blocklets) {
    -      ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
    -          .getExtendedBlocklet(blocklet, distributable.getSegment());
    -      if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
    -        String blockletwritePath =
    -            writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
    -        detailedBlocklet.setDataMapWriterPath(blockletwritePath);
    -        serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
    +      List<ExtendedBlocklet> detailedBlockletsList =
    +          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
    --- End diff --
   
    The blocklets are already pruned so it one Blocklet should always be mapped with one ExtendedBlocklet. how it can return list?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185796999
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---
    @@ -124,32 +132,41 @@ public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
           }
           return detailedBlocklets;
         }
    -    List<TableBlockIndexUniqueIdentifier> identifiers =
    +
    +    Set<TableBlockIndexUniqueIdentifier> identifiers =
             getTableBlockIndexUniqueIdentifiers(segment);
         // Retrieve each blocklets detail information from blocklet datamap
         for (Blocklet blocklet : blocklets) {
    -      detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
    +      detailedBlocklets.addAll(getExtendedBlocklet(identifiers, blocklet));
         }
         return detailedBlocklets;
       }
     
       @Override
    -  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
    +  public List<ExtendedBlocklet> getExtendedBlocklet(Blocklet blocklet, Segment segment)
           throws IOException {
    +    List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
         if (blocklet instanceof ExtendedBlocklet) {
    -      return (ExtendedBlocklet) blocklet;
    +      extendedBlocklets.add((ExtendedBlocklet) blocklet);
    +      return extendedBlocklets;
         }
    -    List<TableBlockIndexUniqueIdentifier> identifiers =
    +    Set<TableBlockIndexUniqueIdentifier> identifiers =
             getTableBlockIndexUniqueIdentifiers(segment);
         return getExtendedBlocklet(identifiers, blocklet);
       }
     
    -  private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
    -      Blocklet blocklet) throws IOException {
    +  private List<ExtendedBlocklet> getExtendedBlocklet(
    +      Set<TableBlockIndexUniqueIdentifier> identifiers, Blocklet blocklet) throws IOException {
    --- End diff --
   
    It should return only one blocklet not list.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185804179
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.BlockMetaInfo;
    +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
    +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +
    +public class BlockletDataMapUtil {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockletDataMapUtil.class.getName());
    +
    +  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
    +      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
    +      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
    +      throws IOException {
    +    if (identifier.getMergeIndexFileName() != null
    +        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getMergeIndexFileName());
    +      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
    +        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
    +        filesRead.add(indexMergeFile.getPath());
    +      }
    +    }
    +    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getIndexFileName()) });
    +    }
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
    +    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
    +        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
    +    for (DataFileFooter footer : indexInfo) {
    +      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
    +      if (null == blockMetaInfoMap.get(blockPath)) {
    +        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
    +      }
    +    }
    +    return blockMetaInfoMap;
    +  }
    +
    +  /**
    +   * This method will create file name to block Meta Info Mapping. This method will reduce the
    +   * number of namenode calls and using this method one namenode will fetch 1000 entries
    +   *
    +   * @param segmentFilePath
    +   * @return
    +   * @throws IOException
    +   */
    +  public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
    +      String segmentFilePath) throws IOException {
    +    Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
    +    if (carbonFile instanceof AbstractDFSCarbonFile) {
    +      Path path = new Path(segmentFilePath);
    +      RemoteIterator<LocatedFileStatus> iter =
    +          ((AbstractDFSCarbonFile) carbonFile).fs.listLocatedStatus(path);
    +      PathFilter pathFilter = new PathFilter() {
    +        @Override public boolean accept(Path path) {
    +          return CarbonTablePath.isCarbonDataFile(path.getName());
    +        }
    +      };
    +      while (iter.hasNext()) {
    +        LocatedFileStatus fileStatus = iter.next();
    +        if (pathFilter.accept(fileStatus.getPath())) {
    +          String[] location = fileStatus.getBlockLocations()[0].getHosts();
    +          long len = fileStatus.getLen();
    +          BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
    +          fileNameToMetaInfoMapping.put(fileStatus.getPath().toString(), blockMetaInfo);
    +        }
    +      }
    +    }
    +    return fileNameToMetaInfoMapping;
    +  }
    +
    +  private static BlockMetaInfo createBlockMetaInfo(
    +      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile)
    +      throws IOException {
    +    FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
    +    switch (fileType) {
    +      case LOCAL:
    +        CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, fileType);
    +        return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
    +      default:
    +        return fileNameToMetaInfoMapping.get(carbonDataFile);
    +    }
    +  }
    +
    +  public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment)
    +      throws IOException {
    +    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>();
    +    Map<String, String> indexFiles = segment.getCommittedIndexFile();
    +    for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
    +      Path indexFile = new Path(indexFileEntry.getKey());
    +      tableBlockIndexUniqueIdentifiers.add(
    +          new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(),
    +              indexFileEntry.getValue(), segment.getSegmentNo()));
    +    }
    +    return tableBlockIndexUniqueIdentifiers;
    +  }
    +
    +  /**
    +   * This method will filter out the TableBlockIndexUniqueIdentifier belongs to that distributable
    +   *
    +   * @param tableBlockIndexUniqueIdentifiers
    +   * @param distributable
    +   * @return
    +   */
    +  public static TableBlockIndexUniqueIdentifier filterIdentifiersBasedOnDistributable(
    +      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
    +      BlockletDataMapDistributable distributable) {
    +    TableBlockIndexUniqueIdentifier validIdentifier = null;
    +    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
    +        tableBlockIndexUniqueIdentifiers) {
    +      if (distributable.getFilePath().equals(
    --- End diff --
   
    Better get the file name from distributable and compare only file name instead of comparing complete path


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185804764
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.BlockMetaInfo;
    +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
    +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +
    +public class BlockletDataMapUtil {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockletDataMapUtil.class.getName());
    +
    +  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
    +      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
    +      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
    +      throws IOException {
    +    if (identifier.getMergeIndexFileName() != null
    +        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getMergeIndexFileName());
    +      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
    +        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
    +        filesRead.add(indexMergeFile.getPath());
    +      }
    +    }
    +    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getIndexFileName()) });
    +    }
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
    +    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
    +        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
    +    for (DataFileFooter footer : indexInfo) {
    +      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
    +      if (null == blockMetaInfoMap.get(blockPath)) {
    +        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
    +      }
    +    }
    +    return blockMetaInfoMap;
    +  }
    +
    +  /**
    +   * This method will create file name to block Meta Info Mapping. This method will reduce the
    +   * number of namenode calls and using this method one namenode will fetch 1000 entries
    +   *
    +   * @param segmentFilePath
    +   * @return
    +   * @throws IOException
    +   */
    +  public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
    +      String segmentFilePath) throws IOException {
    +    Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
    +    if (carbonFile instanceof AbstractDFSCarbonFile) {
    +      Path path = new Path(segmentFilePath);
    +      RemoteIterator<LocatedFileStatus> iter =
    +          ((AbstractDFSCarbonFile) carbonFile).fs.listLocatedStatus(path);
    +      PathFilter pathFilter = new PathFilter() {
    +        @Override public boolean accept(Path path) {
    +          return CarbonTablePath.isCarbonDataFile(path.getName());
    +        }
    +      };
    +      while (iter.hasNext()) {
    +        LocatedFileStatus fileStatus = iter.next();
    +        if (pathFilter.accept(fileStatus.getPath())) {
    +          String[] location = fileStatus.getBlockLocations()[0].getHosts();
    +          long len = fileStatus.getLen();
    +          BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
    +          fileNameToMetaInfoMapping.put(fileStatus.getPath().toString(), blockMetaInfo);
    +        }
    +      }
    +    }
    +    return fileNameToMetaInfoMapping;
    +  }
    +
    +  private static BlockMetaInfo createBlockMetaInfo(
    +      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile)
    +      throws IOException {
    --- End diff --
   
    no need to add throws exception


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2244: [CARBONDATA-2310] Refactored code to improve ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2244#discussion_r185807371
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.BlockMetaInfo;
    +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
    +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +
    +public class BlockletDataMapUtil {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockletDataMapUtil.class.getName());
    +
    +  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
    +      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore,
    +      Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping)
    +      throws IOException {
    +    if (identifier.getMergeIndexFileName() != null
    +        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getMergeIndexFileName());
    +      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
    +        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
    +        filesRead.add(indexMergeFile.getPath());
    +      }
    +    }
    +    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
    +      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
    +          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +              .getIndexFileName()) });
    +    }
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
    +    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
    +        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
    +            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
    +    for (DataFileFooter footer : indexInfo) {
    +      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
    +      if (null == blockMetaInfoMap.get(blockPath)) {
    +        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
    +      }
    +    }
    +    return blockMetaInfoMap;
    +  }
    +
    +  /**
    +   * This method will create file name to block Meta Info Mapping. This method will reduce the
    +   * number of namenode calls and using this method one namenode will fetch 1000 entries
    +   *
    +   * @param segmentFilePath
    +   * @return
    +   * @throws IOException
    +   */
    +  public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
    +      String segmentFilePath) throws IOException {
    +    Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
    --- End diff --
   
    You can directly call `carbonFile.locationAwareListFiles()` to get these instead of using this code


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4471/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5631/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4478/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5639/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4482/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2244: [CARBONDATA-2310] Refactored code to improve Distrib...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2244
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4720/



---
123