[GitHub] carbondata pull request #1984: [WIP] Partition restructure

classic Classic list List threaded Threaded
172 messages Options
1234567 ... 9
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169520945
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java ---
    @@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) {
     
       @Override
       public void freeMemory() {
    -
    +    byteArrayData = null;
    --- End diff --
   
    other types like intData, longData can also be used, so please set the references to null for all the types here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169540773
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
    @@ -79,6 +87,44 @@ public void readAllIIndexOfSegment(String segmentPath) throws IOException {
         }
       }
     
    +  /**
    +   * Read all index files and keep the cache in it.
    +   *
    +   * @param segmentFileStore
    +   * @throws IOException
    +   */
    +  public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
    +      boolean ignoreStatus) throws IOException {
    +    List<CarbonFile> carbonIndexFiles = new ArrayList<>();
    +    if (segmentFileStore.getLocationMap() == null) {
    +      return;
    +    }
    +    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
    +        .getLocationMap().entrySet()) {
    +      String location = locations.getKey();
    +      if (locations.getValue().isRelative()) {
    +        location =
    --- End diff --
   
    This location variable fetching and value modification can be done after the below if check


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169522942
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---
    @@ -17,91 +17,59 @@
     
     package org.apache.carbondata.core.indexstore;
     
    +import java.util.Objects;
    +
     import org.apache.carbondata.core.constants.CarbonCommonConstants;
    -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    -import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
     
     /**
    - * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
    + * Class holds the absoluteTableIdentifier and segment to uniquely identify a segment
    --- End diff --
   
    Modify the comment to remove absoluteTableIdentifier


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169537887
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---
    @@ -660,22 +650,23 @@ public boolean isScanRequired(FilterResolverIntf filterExp) {
         return blocklets;
       }
     
    -  @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
    +  @Override
    +  public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
         if (unsafeMemoryDMStore.getRowCount() == 0) {
           return new ArrayList<>();
         }
    -    // First get the partitions which are stored inside datamap.
    -    List<String> storedPartitions = getPartitions();
         // if it has partitioned datamap but there is no partitioned information stored, it means
         // partitions are dropped so return empty list.
    -    if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
    -      return new ArrayList<>();
    -    }
    -    if (storedPartitions != null && storedPartitions.size() > 0) {
    +    if (partitions != null) {
    +      // First get the partitions which are stored inside datamap.
    +      String[] fileDetails = getFileDetails();
           // Check the exact match of partition information inside the stored partitions.
           boolean found = false;
    -      if (partitions != null && partitions.size() > 0) {
    -        found = partitions.containsAll(storedPartitions);
    +      Path folderPath = new Path(fileDetails[0]);
    +      for (PartitionSpec spec : partitions) {
    +        if (folderPath.equals(spec.getLocation())) {
    +          found = true;
    --- End diff --
   
    break the loop once found


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169540997
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
    @@ -161,9 +212,13 @@ private void readMergeFile(String mergeFilePath) throws IOException {
         MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
         List<String> file_names = indexHeader.getFile_names();
         List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
    +    CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
         assert (file_names.size() == fileData.size());
         for (int i = 0; i < file_names.size(); i++) {
           carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
    +      carbonIndexMapWithFullPath.put(
    +          mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
    +              + file_names.get(i), fileData.get(i).array());
         }
         thriftReader.close();
    --- End diff --
   
    use try and finally block and close the thrift reader inside finally block


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169518845
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Represents one load of carbondata
    + */
    +public class Segment implements Serializable {
    +
    +  private static final long serialVersionUID = 7044555408162234064L;
    +
    +  private String segmentId;
    +
    +  private String segmentFileName;
    +
    +  public Segment(String segmentId, String segmentFileName) {
    +    this.segmentId = segmentId;
    +    this.segmentFileName = segmentFileName;
    +  }
    +
    +  public String getSegmentId() {
    +    return segmentId;
    +  }
    +
    +  public String getSegmentFileName() {
    +    return segmentFileName;
    +  }
    +
    +  public static List<Segment> toSegmentList(String[] segmentIds) {
    +    List<Segment> list = new ArrayList<>(segmentIds.length);
    +    for (String segmentId : segmentIds) {
    +      list.add(toSegment(segmentId));
    +    }
    +    return list;
    +  }
    +
    +  public static List<Segment> toSegmentList(List<String> segmentIds) {
    +    List<Segment> list = new ArrayList<>(segmentIds.size());
    +    for (String segmentId : segmentIds) {
    +      list.add(toSegment(segmentId));
    +    }
    +    return list;
    +  }
    +
    +  public static Segment toSegment(String segmentId) {
    +    String[] split = segmentId.split("#");
    +    if (split.length > 1) {
    +      return new Segment(split[0], split[1]);
    +    } else if (split.length > 0) {
    +      return new Segment(split[0], null);
    +    }
    +    return new Segment(segmentId, null);
    +  }
    +
    +  @Override public boolean equals(Object o) {
    +    if (this == o) return true;
    +    if (o == null || getClass() != o.getClass()) return false;
    +    Segment segment = (Segment) o;
    +    return Objects.equals(segmentId, segment.segmentId);
    +  }
    +
    +  @Override public int hashCode() {
    +
    --- End diff --
   
    Remove his extra line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169518712
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Represents one load of carbondata
    + */
    +public class Segment implements Serializable {
    +
    +  private static final long serialVersionUID = 7044555408162234064L;
    +
    +  private String segmentId;
    +
    +  private String segmentFileName;
    +
    +  public Segment(String segmentId, String segmentFileName) {
    +    this.segmentId = segmentId;
    +    this.segmentFileName = segmentFileName;
    +  }
    +
    +  public String getSegmentId() {
    +    return segmentId;
    +  }
    +
    +  public String getSegmentFileName() {
    +    return segmentFileName;
    +  }
    +
    +  public static List<Segment> toSegmentList(String[] segmentIds) {
    +    List<Segment> list = new ArrayList<>(segmentIds.length);
    +    for (String segmentId : segmentIds) {
    +      list.add(toSegment(segmentId));
    +    }
    +    return list;
    +  }
    +
    +  public static List<Segment> toSegmentList(List<String> segmentIds) {
    +    List<Segment> list = new ArrayList<>(segmentIds.size());
    +    for (String segmentId : segmentIds) {
    +      list.add(toSegment(segmentId));
    +    }
    +    return list;
    +  }
    +
    +  public static Segment toSegment(String segmentId) {
    +    String[] split = segmentId.split("#");
    +    if (split.length > 1) {
    +      return new Segment(split[0], split[1]);
    +    } else if (split.length > 0) {
    +      return new Segment(split[0], null);
    +    }
    +    return new Segment(segmentId, null);
    --- End diff --
   
    Please provide a comment for this method with an example covering the if, else if and else case


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169542459
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    --- End diff --
   
    Move this complete if check inside below if check  {if (carbonFiles != null && carbonFiles.length > 0)}


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1984
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3800/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1984
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2557/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169543287
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    +      location = location.substring(tablePath.length(), location.length());
    +      isRelative = true;
    +    }
    +    if (tempFolder.exists() && partionNames.size() > 0) {
    +      CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().startsWith(taskNo) && file.getName()
    +              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
    +        }
    +      });
    +      if (carbonFiles != null && carbonFiles.length > 0) {
    +        SegmentFile segmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(partionNames);
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : carbonFiles) {
    +          folderDetails.getFiles().add(file.getName());
    +        }
    +        locationMap.put(location, folderDetails);
    +        segmentFile.setLocationMap(locationMap);
    +        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
    +        // write segment info to new file.
    +        writeSegmentFile(segmentFile, path);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Writes the segment file in json format
    +   * @param segmentFile
    +   * @param path
    +   * @throws IOException
    +   */
    +  public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
    +    AtomicFileOperations fileWrite =
    +        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
    +    BufferedWriter brWriter = null;
    +    DataOutputStream dataOutputStream = null;
    +    Gson gsonObjectToWrite = new Gson();
    +    try {
    +      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
    +      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
    +
    +      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
    +      brWriter.write(metadataInstance);
    +    } finally {
    +      if (null != brWriter) {
    +        brWriter.flush();
    +      }
    +      CarbonUtil.closeStreams(brWriter);
    +      fileWrite.close();
    +    }
    +  }
    +
    +  /**
    +   * Merge all segment files in a segment to single file.
    +   *
    +   * @param writePath
    +   * @throws IOException
    +   */
    +  public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
    +      throws IOException {
    +    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
    +    if (segmentFiles != null && segmentFiles.length > 0) {
    +      SegmentFile segmentFile = null;
    +      for (CarbonFile file : segmentFiles) {
    +        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
    +        if (segmentFile == null && localSegmentFile != null) {
    +          segmentFile = localSegmentFile;
    +        }
    +        if (localSegmentFile != null) {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +      if (segmentFile != null) {
    +        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
    +        writeSegmentFile(segmentFile, path);
    +        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
    +      }
    +      return segmentFile;
    +    }
    +    return null;
    +  }
    +
    +  private CarbonFile[] getSegmentFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    if (carbonFile.exists()) {
    +      return carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
    +        }
    +      });
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * It provides segment file only for the partitions which has physical index files.
    +   *
    +   * @param partitionSpecs
    +   */
    +  public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
    +      List<PartitionSpec> partitionSpecs) {
    +    SegmentFile segmentFile = null;
    +    for (PartitionSpec spec : partitionSpecs) {
    +      String location = spec.getLocation().toString();
    +      CarbonFile carbonFile = FileFactory.getCarbonFile(location);
    +      boolean isRelative = false;
    +      if (location.startsWith(tablePath)) {
    --- End diff --
   
    Move this if check after the below if check for {if (listFiles != null && listFiles.length > 0)}


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169543473
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    +      location = location.substring(tablePath.length(), location.length());
    +      isRelative = true;
    +    }
    +    if (tempFolder.exists() && partionNames.size() > 0) {
    +      CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().startsWith(taskNo) && file.getName()
    +              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
    +        }
    +      });
    +      if (carbonFiles != null && carbonFiles.length > 0) {
    +        SegmentFile segmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(partionNames);
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : carbonFiles) {
    +          folderDetails.getFiles().add(file.getName());
    +        }
    +        locationMap.put(location, folderDetails);
    +        segmentFile.setLocationMap(locationMap);
    +        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
    +        // write segment info to new file.
    +        writeSegmentFile(segmentFile, path);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Writes the segment file in json format
    +   * @param segmentFile
    +   * @param path
    +   * @throws IOException
    +   */
    +  public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
    +    AtomicFileOperations fileWrite =
    +        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
    +    BufferedWriter brWriter = null;
    +    DataOutputStream dataOutputStream = null;
    +    Gson gsonObjectToWrite = new Gson();
    +    try {
    +      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
    +      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
    +
    +      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
    +      brWriter.write(metadataInstance);
    +    } finally {
    +      if (null != brWriter) {
    +        brWriter.flush();
    +      }
    +      CarbonUtil.closeStreams(brWriter);
    +      fileWrite.close();
    +    }
    +  }
    +
    +  /**
    +   * Merge all segment files in a segment to single file.
    +   *
    +   * @param writePath
    +   * @throws IOException
    +   */
    +  public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
    +      throws IOException {
    +    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
    +    if (segmentFiles != null && segmentFiles.length > 0) {
    +      SegmentFile segmentFile = null;
    +      for (CarbonFile file : segmentFiles) {
    +        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
    +        if (segmentFile == null && localSegmentFile != null) {
    +          segmentFile = localSegmentFile;
    +        }
    +        if (localSegmentFile != null) {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +      if (segmentFile != null) {
    +        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
    +        writeSegmentFile(segmentFile, path);
    +        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
    --- End diff --
   
    Use try and finally here and call FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)) inside finally block


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169544464
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    +      location = location.substring(tablePath.length(), location.length());
    +      isRelative = true;
    +    }
    +    if (tempFolder.exists() && partionNames.size() > 0) {
    +      CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().startsWith(taskNo) && file.getName()
    +              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
    +        }
    +      });
    +      if (carbonFiles != null && carbonFiles.length > 0) {
    +        SegmentFile segmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(partionNames);
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : carbonFiles) {
    +          folderDetails.getFiles().add(file.getName());
    +        }
    +        locationMap.put(location, folderDetails);
    +        segmentFile.setLocationMap(locationMap);
    +        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
    +        // write segment info to new file.
    +        writeSegmentFile(segmentFile, path);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Writes the segment file in json format
    +   * @param segmentFile
    +   * @param path
    +   * @throws IOException
    +   */
    +  public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
    +    AtomicFileOperations fileWrite =
    +        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
    +    BufferedWriter brWriter = null;
    +    DataOutputStream dataOutputStream = null;
    +    Gson gsonObjectToWrite = new Gson();
    +    try {
    +      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
    +      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
    +
    +      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
    +      brWriter.write(metadataInstance);
    +    } finally {
    +      if (null != brWriter) {
    +        brWriter.flush();
    +      }
    +      CarbonUtil.closeStreams(brWriter);
    +      fileWrite.close();
    +    }
    +  }
    +
    +  /**
    +   * Merge all segment files in a segment to single file.
    +   *
    +   * @param writePath
    +   * @throws IOException
    +   */
    +  public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
    +      throws IOException {
    +    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
    +    if (segmentFiles != null && segmentFiles.length > 0) {
    +      SegmentFile segmentFile = null;
    +      for (CarbonFile file : segmentFiles) {
    +        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
    +        if (segmentFile == null && localSegmentFile != null) {
    +          segmentFile = localSegmentFile;
    +        }
    +        if (localSegmentFile != null) {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +      if (segmentFile != null) {
    +        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
    +        writeSegmentFile(segmentFile, path);
    +        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
    +      }
    +      return segmentFile;
    +    }
    +    return null;
    +  }
    +
    +  private CarbonFile[] getSegmentFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    if (carbonFile.exists()) {
    +      return carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
    +        }
    +      });
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * It provides segment file only for the partitions which has physical index files.
    +   *
    +   * @param partitionSpecs
    +   */
    +  public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
    +      List<PartitionSpec> partitionSpecs) {
    +    SegmentFile segmentFile = null;
    +    for (PartitionSpec spec : partitionSpecs) {
    +      String location = spec.getLocation().toString();
    +      CarbonFile carbonFile = FileFactory.getCarbonFile(location);
    +      boolean isRelative = false;
    +      if (location.startsWith(tablePath)) {
    +        location = location.substring(tablePath.length(), location.length());
    +        isRelative = true;
    +      }
    +      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
    +        }
    +      });
    +      if (listFiles != null && listFiles.length > 0) {
    +        SegmentFile localSegmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(spec.getPartitions());
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : listFiles) {
    +          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +            folderDetails.setMergeFileName(file.getName());
    +          } else {
    +            folderDetails.getFiles().add(file.getName());
    +          }
    +        }
    +        locationMap.put(location, folderDetails);
    +        localSegmentFile.setLocationMap(locationMap);
    +        if (segmentFile == null) {
    +          segmentFile = localSegmentFile;
    +        } else {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +    }
    +    return segmentFile;
    +  }
    +
    +  /**
    +   * This method reads the segment file which is written in json format
    +   *
    +   * @param segmentFilePath
    +   * @return
    +   */
    +  private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
    +    Gson gsonObjectToRead = new Gson();
    +    DataInputStream dataInputStream = null;
    +    BufferedReader buffReader = null;
    +    InputStreamReader inStream = null;
    +    SegmentFile segmentFile;
    +    AtomicFileOperations fileOperation =
    +        new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
    +
    +    try {
    +      if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
    +        return null;
    +      }
    +      dataInputStream = fileOperation.openForRead();
    +      inStream = new InputStreamReader(dataInputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
    +      buffReader = new BufferedReader(inStream);
    +      segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
    +    } finally {
    +      if (inStream != null) {
    +        CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
    +      }
    +    }
    +
    +    return segmentFile;
    +  }
    +
    +  /**
    +   * Reads segment file.
    +   */
    +  public void readSegment(String tablePath, String segmentFileName) throws IOException {
    +    String segmentFilePath =
    +        CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
    +            + segmentFileName;
    +    SegmentFile segmentFile = readSegmentFile(segmentFilePath);
    +    this.tablePath = tablePath;
    +    this.segmentFile = segmentFile;
    --- End diff --
   
    Here we are directly updating the member variable. If it is called from multiple places then in concurrent operations it can cause problem. Kindly re-verify and if possible avoid updating the member variables if getting called from multiple places


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169544627
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    +      location = location.substring(tablePath.length(), location.length());
    +      isRelative = true;
    +    }
    +    if (tempFolder.exists() && partionNames.size() > 0) {
    +      CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().startsWith(taskNo) && file.getName()
    +              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
    +        }
    +      });
    +      if (carbonFiles != null && carbonFiles.length > 0) {
    +        SegmentFile segmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(partionNames);
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : carbonFiles) {
    +          folderDetails.getFiles().add(file.getName());
    +        }
    +        locationMap.put(location, folderDetails);
    +        segmentFile.setLocationMap(locationMap);
    +        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
    +        // write segment info to new file.
    +        writeSegmentFile(segmentFile, path);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Writes the segment file in json format
    +   * @param segmentFile
    +   * @param path
    +   * @throws IOException
    +   */
    +  public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
    +    AtomicFileOperations fileWrite =
    +        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
    +    BufferedWriter brWriter = null;
    +    DataOutputStream dataOutputStream = null;
    +    Gson gsonObjectToWrite = new Gson();
    +    try {
    +      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
    +      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
    +
    +      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
    +      brWriter.write(metadataInstance);
    +    } finally {
    +      if (null != brWriter) {
    +        brWriter.flush();
    +      }
    +      CarbonUtil.closeStreams(brWriter);
    +      fileWrite.close();
    +    }
    +  }
    +
    +  /**
    +   * Merge all segment files in a segment to single file.
    +   *
    +   * @param writePath
    +   * @throws IOException
    +   */
    +  public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
    +      throws IOException {
    +    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
    +    if (segmentFiles != null && segmentFiles.length > 0) {
    +      SegmentFile segmentFile = null;
    +      for (CarbonFile file : segmentFiles) {
    +        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
    +        if (segmentFile == null && localSegmentFile != null) {
    +          segmentFile = localSegmentFile;
    +        }
    +        if (localSegmentFile != null) {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +      if (segmentFile != null) {
    +        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
    +        writeSegmentFile(segmentFile, path);
    +        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
    +      }
    +      return segmentFile;
    +    }
    +    return null;
    +  }
    +
    +  private CarbonFile[] getSegmentFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    if (carbonFile.exists()) {
    +      return carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
    +        }
    +      });
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * It provides segment file only for the partitions which has physical index files.
    +   *
    +   * @param partitionSpecs
    +   */
    +  public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
    +      List<PartitionSpec> partitionSpecs) {
    +    SegmentFile segmentFile = null;
    +    for (PartitionSpec spec : partitionSpecs) {
    +      String location = spec.getLocation().toString();
    +      CarbonFile carbonFile = FileFactory.getCarbonFile(location);
    +      boolean isRelative = false;
    +      if (location.startsWith(tablePath)) {
    +        location = location.substring(tablePath.length(), location.length());
    +        isRelative = true;
    +      }
    +      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
    +        }
    +      });
    +      if (listFiles != null && listFiles.length > 0) {
    +        SegmentFile localSegmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(spec.getPartitions());
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : listFiles) {
    +          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +            folderDetails.setMergeFileName(file.getName());
    +          } else {
    +            folderDetails.getFiles().add(file.getName());
    +          }
    +        }
    +        locationMap.put(location, folderDetails);
    +        localSegmentFile.setLocationMap(locationMap);
    +        if (segmentFile == null) {
    +          segmentFile = localSegmentFile;
    +        } else {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +    }
    +    return segmentFile;
    +  }
    +
    +  /**
    +   * This method reads the segment file which is written in json format
    +   *
    +   * @param segmentFilePath
    +   * @return
    +   */
    +  private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
    +    Gson gsonObjectToRead = new Gson();
    +    DataInputStream dataInputStream = null;
    +    BufferedReader buffReader = null;
    +    InputStreamReader inStream = null;
    +    SegmentFile segmentFile;
    +    AtomicFileOperations fileOperation =
    +        new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
    +
    +    try {
    +      if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
    +        return null;
    +      }
    +      dataInputStream = fileOperation.openForRead();
    +      inStream = new InputStreamReader(dataInputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
    +      buffReader = new BufferedReader(inStream);
    +      segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
    +    } finally {
    +      if (inStream != null) {
    +        CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
    +      }
    +    }
    +
    +    return segmentFile;
    +  }
    +
    +  /**
    +   * Reads segment file.
    +   */
    +  public void readSegment(String tablePath, String segmentFileName) throws IOException {
    +    String segmentFilePath =
    +        CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
    +            + segmentFileName;
    +    SegmentFile segmentFile = readSegmentFile(segmentFilePath);
    +    this.tablePath = tablePath;
    +    this.segmentFile = segmentFile;
    +  }
    +
    +  public String getTablePath() {
    +    return tablePath;
    +  }
    +
    +  /**
    +   * Gets all the index files and related carbondata files from this segment. First user needs to
    +   * call @readIndexFiles method before calling it.
    +   * @return
    +   */
    +  public Map<String, List<String>> getIndexFilesMap() {
    +    return indexFilesMap;
    +  }
    +
    +  /**
    +   * Reads all index files which are located in this segment. First user needs to call
    +   * @readSegment method before calling it.
    +   * @throws IOException
    +   */
    +  public void readIndexFiles() throws IOException {
    +    readIndexFiles(SegmentStatus.SUCCESS, false);
    +  }
    +
    +  /**
    +   * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
    +   * reads all index files
    +   * @param status
    +   * @param ignoreStatus
    +   * @throws IOException
    +   */
    +  private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
    +    if (indexFilesMap != null) {
    +      return;
    +    }
    +    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
    +    indexFilesMap = new HashMap<>();
    +    indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
    +    Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
    +      List<DataFileFooter> indexInfo =
    +          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
    +      List<String> blocks = new ArrayList<>();
    +      for (DataFileFooter footer : indexInfo) {
    +        blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
    +      }
    +      indexFilesMap.put(entry.getKey(), blocks);
    +    }
    +  }
    +
    +  /**
    +   * Gets all index files from this segment
    +   * @return
    +   */
    +  public Map<String, String> getIndexFiles() {
    +    Map<String, String> indexFiles = new HashMap<>();
    +    if (segmentFile != null) {
    +      for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
    +        String location = entry.getKey();
    +        if (entry.getValue().isRelative) {
    +          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
    +        }
    +        if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
    +          for (String indexFile : entry.getValue().getFiles()) {
    +            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
    +                entry.getValue().mergeFileName);
    +          }
    +        }
    +      }
    +    }
    +    return indexFiles;
    +  }
    +
    +  /**
    +   * Drops the partition related files from the segment file of the segment and writes
    +   * to a new file. First iterator over segment file and check the path it needs to be dropped.
    +   * And update the status with delete if it found.
    +   *
    +   * @param uniqueId
    +   * @throws IOException
    +   */
    +  public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs,
    +      String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
    +      throws IOException {
    +    readSegment(tablePath, segment.getSegmentFileName());
    +    boolean updateSegment = false;
    +    for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
    +      String location = entry.getKey();
    +      if (entry.getValue().isRelative) {
    +        location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
    +      }
    +      Path path = new Path(location);
    +      // Update the status to delete if path equals
    +      for (PartitionSpec spec : partitionSpecs) {
    +        if (path.equals(spec.getLocation())) {
    +          entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
    +          updateSegment = true;
    +          break;
    +        }
    +      }
    +    }
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
    +    writePath =
    +        writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId
    +            + CarbonTablePath.SEGMENT_EXT;
    +    writeSegmentFile(segmentFile, writePath);
    +    // Check whether we can completly remove the segment.
    +    boolean deleteSegment = true;
    +    for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
    +      if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
    +        deleteSegment = false;
    --- End diff --
   
    break the loop once deleteSegment is set to false


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169544845
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    +      location = location.substring(tablePath.length(), location.length());
    +      isRelative = true;
    +    }
    +    if (tempFolder.exists() && partionNames.size() > 0) {
    +      CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().startsWith(taskNo) && file.getName()
    +              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
    +        }
    +      });
    +      if (carbonFiles != null && carbonFiles.length > 0) {
    +        SegmentFile segmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(partionNames);
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : carbonFiles) {
    +          folderDetails.getFiles().add(file.getName());
    +        }
    +        locationMap.put(location, folderDetails);
    +        segmentFile.setLocationMap(locationMap);
    +        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
    +        // write segment info to new file.
    +        writeSegmentFile(segmentFile, path);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Writes the segment file in json format
    +   * @param segmentFile
    +   * @param path
    +   * @throws IOException
    +   */
    +  public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
    +    AtomicFileOperations fileWrite =
    +        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
    +    BufferedWriter brWriter = null;
    +    DataOutputStream dataOutputStream = null;
    +    Gson gsonObjectToWrite = new Gson();
    +    try {
    +      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
    +      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
    +
    +      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
    +      brWriter.write(metadataInstance);
    +    } finally {
    +      if (null != brWriter) {
    +        brWriter.flush();
    +      }
    +      CarbonUtil.closeStreams(brWriter);
    +      fileWrite.close();
    +    }
    +  }
    +
    +  /**
    +   * Merge all segment files in a segment to single file.
    +   *
    +   * @param writePath
    +   * @throws IOException
    +   */
    +  public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
    +      throws IOException {
    +    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
    +    if (segmentFiles != null && segmentFiles.length > 0) {
    +      SegmentFile segmentFile = null;
    +      for (CarbonFile file : segmentFiles) {
    +        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
    +        if (segmentFile == null && localSegmentFile != null) {
    +          segmentFile = localSegmentFile;
    +        }
    +        if (localSegmentFile != null) {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +      if (segmentFile != null) {
    +        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
    +        writeSegmentFile(segmentFile, path);
    +        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
    +      }
    +      return segmentFile;
    +    }
    +    return null;
    +  }
    +
    +  private CarbonFile[] getSegmentFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    if (carbonFile.exists()) {
    +      return carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
    +        }
    +      });
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * It provides segment file only for the partitions which has physical index files.
    +   *
    +   * @param partitionSpecs
    +   */
    +  public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
    +      List<PartitionSpec> partitionSpecs) {
    +    SegmentFile segmentFile = null;
    +    for (PartitionSpec spec : partitionSpecs) {
    +      String location = spec.getLocation().toString();
    +      CarbonFile carbonFile = FileFactory.getCarbonFile(location);
    +      boolean isRelative = false;
    +      if (location.startsWith(tablePath)) {
    +        location = location.substring(tablePath.length(), location.length());
    +        isRelative = true;
    +      }
    +      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
    +        }
    +      });
    +      if (listFiles != null && listFiles.length > 0) {
    +        SegmentFile localSegmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(spec.getPartitions());
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : listFiles) {
    +          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +            folderDetails.setMergeFileName(file.getName());
    +          } else {
    +            folderDetails.getFiles().add(file.getName());
    +          }
    +        }
    +        locationMap.put(location, folderDetails);
    +        localSegmentFile.setLocationMap(locationMap);
    +        if (segmentFile == null) {
    +          segmentFile = localSegmentFile;
    +        } else {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +    }
    +    return segmentFile;
    +  }
    +
    +  /**
    +   * This method reads the segment file which is written in json format
    +   *
    +   * @param segmentFilePath
    +   * @return
    +   */
    +  private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
    +    Gson gsonObjectToRead = new Gson();
    +    DataInputStream dataInputStream = null;
    +    BufferedReader buffReader = null;
    +    InputStreamReader inStream = null;
    +    SegmentFile segmentFile;
    +    AtomicFileOperations fileOperation =
    +        new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
    +
    +    try {
    +      if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
    +        return null;
    +      }
    +      dataInputStream = fileOperation.openForRead();
    +      inStream = new InputStreamReader(dataInputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
    +      buffReader = new BufferedReader(inStream);
    +      segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
    +    } finally {
    +      if (inStream != null) {
    +        CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
    +      }
    +    }
    +
    +    return segmentFile;
    +  }
    +
    +  /**
    +   * Reads segment file.
    +   */
    +  public void readSegment(String tablePath, String segmentFileName) throws IOException {
    +    String segmentFilePath =
    +        CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
    +            + segmentFileName;
    +    SegmentFile segmentFile = readSegmentFile(segmentFilePath);
    +    this.tablePath = tablePath;
    +    this.segmentFile = segmentFile;
    +  }
    +
    +  public String getTablePath() {
    +    return tablePath;
    +  }
    +
    +  /**
    +   * Gets all the index files and related carbondata files from this segment. First user needs to
    +   * call @readIndexFiles method before calling it.
    +   * @return
    +   */
    +  public Map<String, List<String>> getIndexFilesMap() {
    +    return indexFilesMap;
    +  }
    +
    +  /**
    +   * Reads all index files which are located in this segment. First user needs to call
    +   * @readSegment method before calling it.
    +   * @throws IOException
    +   */
    +  public void readIndexFiles() throws IOException {
    +    readIndexFiles(SegmentStatus.SUCCESS, false);
    +  }
    +
    +  /**
    +   * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
    +   * reads all index files
    +   * @param status
    +   * @param ignoreStatus
    +   * @throws IOException
    +   */
    +  private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
    +    if (indexFilesMap != null) {
    +      return;
    +    }
    +    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
    +    indexFilesMap = new HashMap<>();
    +    indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
    +    Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
    +    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
    +    for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
    +      List<DataFileFooter> indexInfo =
    +          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
    +      List<String> blocks = new ArrayList<>();
    +      for (DataFileFooter footer : indexInfo) {
    +        blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
    +      }
    +      indexFilesMap.put(entry.getKey(), blocks);
    +    }
    +  }
    +
    +  /**
    +   * Gets all index files from this segment
    +   * @return
    +   */
    +  public Map<String, String> getIndexFiles() {
    +    Map<String, String> indexFiles = new HashMap<>();
    +    if (segmentFile != null) {
    +      for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
    +        String location = entry.getKey();
    +        if (entry.getValue().isRelative) {
    +          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
    +        }
    +        if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
    +          for (String indexFile : entry.getValue().getFiles()) {
    +            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
    +                entry.getValue().mergeFileName);
    +          }
    +        }
    +      }
    +    }
    +    return indexFiles;
    +  }
    +
    +  /**
    +   * Drops the partition related files from the segment file of the segment and writes
    +   * to a new file. First iterator over segment file and check the path it needs to be dropped.
    +   * And update the status with delete if it found.
    +   *
    +   * @param uniqueId
    +   * @throws IOException
    +   */
    +  public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs,
    +      String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
    +      throws IOException {
    +    readSegment(tablePath, segment.getSegmentFileName());
    +    boolean updateSegment = false;
    +    for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
    +      String location = entry.getKey();
    +      if (entry.getValue().isRelative) {
    +        location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
    +      }
    +      Path path = new Path(location);
    +      // Update the status to delete if path equals
    +      for (PartitionSpec spec : partitionSpecs) {
    +        if (path.equals(spec.getLocation())) {
    +          entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
    +          updateSegment = true;
    +          break;
    +        }
    +      }
    +    }
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
    +    writePath =
    +        writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId
    +            + CarbonTablePath.SEGMENT_EXT;
    +    writeSegmentFile(segmentFile, writePath);
    +    // Check whether we can completly remove the segment.
    +    boolean deleteSegment = true;
    +    for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
    +      if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
    +        deleteSegment = false;
    +      }
    +    }
    +    if (deleteSegment) {
    +      toBeDeletedSegments.add(segment.getSegmentId());
    +    }
    +    if (updateSegment) {
    +      toBeUpdatedSegments.add(segment.getSegmentId());
    +    }
    +  }
    +
    +  /**
    +   * Update the table status file with the dropped partitions information
    +   *
    +   * @param carbonTable
    +   * @param uniqueId
    +   * @param toBeUpdatedSegments
    +   * @param toBeDeleteSegments
    +   * @throws IOException
    +   */
    +  public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
    +      List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
    +    Set<Segment> segmentSet = new HashSet<>(
    --- End diff --
   
    Move segmentSet creation inside the below if check


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1984
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3802/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1984
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2558/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1984
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3591/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1984
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3592/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169619680
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Represents one load of carbondata
    + */
    +public class Segment implements Serializable {
    +
    +  private static final long serialVersionUID = 7044555408162234064L;
    +
    +  private String segmentId;
    +
    +  private String segmentFileName;
    +
    +  public Segment(String segmentId, String segmentFileName) {
    +    this.segmentId = segmentId;
    +    this.segmentFileName = segmentFileName;
    +  }
    +
    +  public String getSegmentId() {
    +    return segmentId;
    +  }
    +
    +  public String getSegmentFileName() {
    +    return segmentFileName;
    +  }
    +
    +  public static List<Segment> toSegmentList(String[] segmentIds) {
    --- End diff --
   
    ok, renamed


---
1234567 ... 9