[GitHub] carbondata pull request #1436: [WIP][CARBONDATA-1617] Merging carbonindex fi...

classic Classic list List threaded Threaded
62 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147675377
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapInfo.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap.dev;
    +
    +/**
    + * Information required to build datamap
    + */
    +public class DataMapInfo {
    --- End diff --
   
    Please rename to `DataMapLoadModel` as it is the input parameter for datamap loading


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147676934
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---
    @@ -84,15 +92,36 @@ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache
       @Override public List<BlockletDataMap> getAll(
           List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
         List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
    +    List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
         try {
    --- End diff --
   
    Can you add some comment for this try block


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147677463
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapInfo.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.indexstore.blockletindex;
    +
    +import org.apache.carbondata.core.datamap.dev.DataMapInfo;
    +
    +public class BlockletDataMapInfo extends DataMapInfo {
    --- End diff --
   
    rename to `BlockletDataMapLoadModel`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147677650
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.indexstore.blockletindex;
    +
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.*;
    +
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.MergedBlockIndex;
    +import org.apache.carbondata.format.MergedBlockIndexHeader;
    +
    +import org.apache.thrift.TBase;
    +
    +/**
    + * This class manages reading of index files with in the segment. The files it read can be
    + * carbonindex or carbonindexmerge files.
    + */
    +public class SegmentIndexFileStore {
    +
    +  private Map<String, byte[]> carbonIndexMap;
    +
    +  public SegmentIndexFileStore() throws IOException {
    +    carbonIndexMap = new HashMap<>();
    +  }
    +
    +  /**
    +   * Read all index files and keep the cache in it.
    +   *
    +   * @param segmentPath
    +   * @throws IOException
    +   */
    +  public void readAllIIndexOfSegment(String segmentPath) throws IOException {
    +    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
    +    for (int i = 0; i < carbonIndexFiles.length; i++) {
    +      if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +        readMergeFile(carbonIndexFiles[i].getCanonicalPath());
    +      } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
    +        readIndexFile(carbonIndexFiles[i]);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Read all index file names of the segment
    +   *
    +   * @param segmentPath
    +   * @return
    +   * @throws IOException
    +   */
    +  public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException {
    +    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
    +    Set<String> indexFiles = new HashSet<>();
    +    for (int i = 0; i < carbonIndexFiles.length; i++) {
    +      if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +        indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()));
    +      } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
    +        indexFiles.add(carbonIndexFiles[i].getName());
    +      }
    +    }
    +    return new ArrayList<>(indexFiles);
    +  }
    +
    +  /**
    +   * List all the index files inside merge file.
    +   * @param mergeFile
    +   * @return
    +   * @throws IOException
    +   */
    +  public List<String> getIndexFilesFromMergeFile(String mergeFile) throws IOException {
    +    List<String> indexFiles = new ArrayList<>();
    +    ThriftReader thriftReader = new ThriftReader(mergeFile);
    +    thriftReader.open();
    +    MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
    +    List<String> file_names = indexHeader.getFile_names();
    +    indexFiles.addAll(file_names);
    +    thriftReader.close();
    +    return indexFiles;
    +  }
    +
    +  /**
    +   * Read carbonindexmerge file and update the map
    +   *
    +   * @param mergeFilePath
    +   * @throws IOException
    +   */
    +  private void readMergeFile(String mergeFilePath) throws IOException {
    +    ThriftReader thriftReader = new ThriftReader(mergeFilePath);
    +    thriftReader.open();
    +    MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
    +    MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
    +    List<String> file_names = indexHeader.getFile_names();
    +    List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
    +    assert (file_names.size() == fileData.size());
    +    for (int i = 0; i < file_names.size(); i++) {
    +      carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
    +    }
    +    thriftReader.close();
    +  }
    +
    +  /**
    +   * Read carbonindex file and convert to stream and add to map
    +   *
    +   * @param indexFile
    +   * @throws IOException
    +   */
    +  private void readIndexFile(CarbonFile indexFile) throws IOException {
    +    String indexFilePath = indexFile.getCanonicalPath();
    +    DataInputStream dataInputStream =
    +        FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
    +    byte[] bytes = new byte[(int) indexFile.getSize()];
    +    dataInputStream.readFully(bytes);
    +    carbonIndexMap.put(indexFile.getName(), bytes);
    +  }
    +
    +  private MergedBlockIndexHeader readMergeBlockIndexHeader(ThriftReader thriftReader)
    +      throws IOException {
    +    return (MergedBlockIndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() {
    +      @Override public TBase create() {
    +        return new MergedBlockIndexHeader();
    +      }
    +    });
    +  }
    +
    +  private MergedBlockIndex readMergeBlockIndex(ThriftReader thriftReader) throws IOException {
    +    return (MergedBlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() {
    +      @Override public TBase create() {
    +        return new MergedBlockIndex();
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Get the carbonindex file content
    +   *
    +   * @param fileName
    +   * @return
    +   */
    +  public byte[] getFileData(String fileName) {
    +    return carbonIndexMap.get(fileName);
    +  }
    +
    +  /**
    +   * List all the index files of the segment.
    +   *
    +   * @param segmentPath
    +   * @return
    +   */
    +  public static CarbonFile[] getCarbonIndexFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    return carbonFile.listFiles(new CarbonFileFilter() {
    +      @Override public boolean accept(CarbonFile file) {
    +        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
    +            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Return the map tht contain index file name and content of the file.
    --- End diff --
   
    typo tht


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1436: [CARBONDATA-1617] Merging carbonindex files within s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1436
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/748/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147680151
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.indexstore.blockletindex;
    +
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.*;
    +
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.MergedBlockIndex;
    +import org.apache.carbondata.format.MergedBlockIndexHeader;
    +
    +import org.apache.thrift.TBase;
    +
    +/**
    + * This class manages reading of index files with in the segment. The files it read can be
    + * carbonindex or carbonindexmerge files.
    + */
    +public class SegmentIndexFileStore {
    +
    +  private Map<String, byte[]> carbonIndexMap;
    --- End diff --
   
    Add comment for this map


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147680430
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---
    @@ -48,9 +48,11 @@
       protected static final String CARBON_UPDATE_DELTA_EXT = ".updatedelta";
       protected static final String DATA_PART_PREFIX = "part-";
       protected static final String BATCH_PREFIX = "_batchno";
    -  protected static final String INDEX_FILE_EXT = ".carbonindex";
       protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
     
    +  public static final String INDEX_FILE_EXT = ".carbonindex";
    --- End diff --
   
    Can we change it to package private static member?
    And can you change the above member from line 36 to 51 also


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147680637
 
    --- Diff: format/src/main/thrift/carbondata_index_merge.thrift ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/**
    + * File format description for CarbonData merged index file.
    --- End diff --
   
    please mention it is for one segment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147681468
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
    +
    +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String)
    +  extends Partition {
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +class CarbonMergeFilesRDD(
    --- End diff --
   
    Add comment to explain it is for merging the carbonindex file within one segment after the data loading


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147681655
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
    +
    +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String)
    +  extends Partition {
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +class CarbonMergeFilesRDD(
    +    sc: SparkContext,
    +    tablePath: String,
    +    segments: Seq[String])
    +  extends CarbonRDD[String](sc, Nil) {
    +
    +  sc.setLocalProperty("spark.scheduler.pool", "DDL")
    --- End diff --
   
    Is this for testing? Please remove if not required


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147682103
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
    +
    +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String)
    +  extends Partition {
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +class CarbonMergeFilesRDD(
    +    sc: SparkContext,
    +    tablePath: String,
    +    segments: Seq[String])
    +  extends CarbonRDD[String](sc, Nil) {
    +
    +  sc.setLocalProperty("spark.scheduler.pool", "DDL")
    +
    +
    +  override def getPartitions: Array[Partition] = {
    +    segments.zipWithIndex.map {s =>
    +      CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
    +    }.toArray
    +  }
    +
    +  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
    +    val iter = new Iterator[String] {
    +      val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
    +      logInfo("Merging carbon index files of segment : " + split.segmentPath)
    +
    +      new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(split.segmentPath)
    --- End diff --
   
    Instead of creating an RDD, why not just do rdd.mapPartition and invoke this function to merge index file?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147682356
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
    +
    +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String)
    +  extends Partition {
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +class CarbonMergeFilesRDD(
    --- End diff --
   
    Instead of creating an RDD, why not just do rdd.mapPartition and invoke this function to merge index file?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147683214
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---
    @@ -106,6 +108,10 @@ object Compactor {
         }
     
         if (finalMergeStatus) {
    +      val mergedLoadNumber = mergedLoadName
    --- End diff --
   
    Can you add this logic in `LoadMetadataDetails.getLoadNumber` so that it can be used here?



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147689756
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---
    @@ -1376,6 +1376,13 @@
     
       public static final String BITSET_PIPE_LINE_DEFAULT = "true";
     
    +  /**
    +   * It will merge the carbon index files with in the segment to single segment.
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147690043
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---
    @@ -66,13 +70,17 @@ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache
         segmentLockMap = new ConcurrentHashMap<String, Object>();
       }
     
    -  @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
    +  @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier)
           throws IOException {
    -    String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
    +    String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
         BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
         if (dataMap == null) {
           try {
    -        dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
    +        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
    +        indexFileStore.readAllIIndexOfSegment(CarbonTablePath.getSegmentPath(
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147690179
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapInfo.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap.dev;
    +
    +/**
    + * Information required to build datamap
    + */
    +public class DataMapInfo {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147690658
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---
    @@ -84,15 +92,36 @@ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache
       @Override public List<BlockletDataMap> getAll(
           List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
         List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
    +    List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
         try {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147690705
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapInfo.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.indexstore.blockletindex;
    +
    +import org.apache.carbondata.core.datamap.dev.DataMapInfo;
    +
    +public class BlockletDataMapInfo extends DataMapInfo {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147690846
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.indexstore.blockletindex;
    +
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.*;
    +
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.MergedBlockIndex;
    +import org.apache.carbondata.format.MergedBlockIndexHeader;
    +
    +import org.apache.thrift.TBase;
    +
    +/**
    + * This class manages reading of index files with in the segment. The files it read can be
    + * carbonindex or carbonindexmerge files.
    + */
    +public class SegmentIndexFileStore {
    +
    +  private Map<String, byte[]> carbonIndexMap;
    +
    +  public SegmentIndexFileStore() throws IOException {
    +    carbonIndexMap = new HashMap<>();
    +  }
    +
    +  /**
    +   * Read all index files and keep the cache in it.
    +   *
    +   * @param segmentPath
    +   * @throws IOException
    +   */
    +  public void readAllIIndexOfSegment(String segmentPath) throws IOException {
    +    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
    +    for (int i = 0; i < carbonIndexFiles.length; i++) {
    +      if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +        readMergeFile(carbonIndexFiles[i].getCanonicalPath());
    +      } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
    +        readIndexFile(carbonIndexFiles[i]);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Read all index file names of the segment
    +   *
    +   * @param segmentPath
    +   * @return
    +   * @throws IOException
    +   */
    +  public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException {
    +    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
    +    Set<String> indexFiles = new HashSet<>();
    +    for (int i = 0; i < carbonIndexFiles.length; i++) {
    +      if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
    +        indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()));
    +      } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
    +        indexFiles.add(carbonIndexFiles[i].getName());
    +      }
    +    }
    +    return new ArrayList<>(indexFiles);
    +  }
    +
    +  /**
    +   * List all the index files inside merge file.
    +   * @param mergeFile
    +   * @return
    +   * @throws IOException
    +   */
    +  public List<String> getIndexFilesFromMergeFile(String mergeFile) throws IOException {
    +    List<String> indexFiles = new ArrayList<>();
    +    ThriftReader thriftReader = new ThriftReader(mergeFile);
    +    thriftReader.open();
    +    MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
    +    List<String> file_names = indexHeader.getFile_names();
    +    indexFiles.addAll(file_names);
    +    thriftReader.close();
    +    return indexFiles;
    +  }
    +
    +  /**
    +   * Read carbonindexmerge file and update the map
    +   *
    +   * @param mergeFilePath
    +   * @throws IOException
    +   */
    +  private void readMergeFile(String mergeFilePath) throws IOException {
    +    ThriftReader thriftReader = new ThriftReader(mergeFilePath);
    +    thriftReader.open();
    +    MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
    +    MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
    +    List<String> file_names = indexHeader.getFile_names();
    +    List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
    +    assert (file_names.size() == fileData.size());
    +    for (int i = 0; i < file_names.size(); i++) {
    +      carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
    +    }
    +    thriftReader.close();
    +  }
    +
    +  /**
    +   * Read carbonindex file and convert to stream and add to map
    +   *
    +   * @param indexFile
    +   * @throws IOException
    +   */
    +  private void readIndexFile(CarbonFile indexFile) throws IOException {
    +    String indexFilePath = indexFile.getCanonicalPath();
    +    DataInputStream dataInputStream =
    +        FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
    +    byte[] bytes = new byte[(int) indexFile.getSize()];
    +    dataInputStream.readFully(bytes);
    +    carbonIndexMap.put(indexFile.getName(), bytes);
    +  }
    +
    +  private MergedBlockIndexHeader readMergeBlockIndexHeader(ThriftReader thriftReader)
    +      throws IOException {
    +    return (MergedBlockIndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() {
    +      @Override public TBase create() {
    +        return new MergedBlockIndexHeader();
    +      }
    +    });
    +  }
    +
    +  private MergedBlockIndex readMergeBlockIndex(ThriftReader thriftReader) throws IOException {
    +    return (MergedBlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() {
    +      @Override public TBase create() {
    +        return new MergedBlockIndex();
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Get the carbonindex file content
    +   *
    +   * @param fileName
    +   * @return
    +   */
    +  public byte[] getFileData(String fileName) {
    +    return carbonIndexMap.get(fileName);
    +  }
    +
    +  /**
    +   * List all the index files of the segment.
    +   *
    +   * @param segmentPath
    +   * @return
    +   */
    +  public static CarbonFile[] getCarbonIndexFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    return carbonFile.listFiles(new CarbonFileFilter() {
    +      @Override public boolean accept(CarbonFile file) {
    +        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
    +            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Return the map tht contain index file name and content of the file.
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1436: [CARBONDATA-1617] Merging carbonindex files w...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1436#discussion_r147690975
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.indexstore.blockletindex;
    +
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.*;
    +
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.MergedBlockIndex;
    +import org.apache.carbondata.format.MergedBlockIndexHeader;
    +
    +import org.apache.thrift.TBase;
    +
    +/**
    + * This class manages reading of index files with in the segment. The files it read can be
    + * carbonindex or carbonindexmerge files.
    + */
    +public class SegmentIndexFileStore {
    +
    +  private Map<String, byte[]> carbonIndexMap;
    --- End diff --
   
    ok


---
1234