Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147675377 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapInfo.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap.dev; + +/** + * Information required to build datamap + */ +public class DataMapInfo { --- End diff -- Please rename to `DataMapLoadModel` as it is the input parameter for datamap loading --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147676934 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java --- @@ -84,15 +92,36 @@ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache @Override public List<BlockletDataMap> getAll( List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException { List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); + List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>(); try { --- End diff -- Can you add some comment for this try block --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147677463 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapInfo.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import org.apache.carbondata.core.datamap.dev.DataMapInfo; + +public class BlockletDataMapInfo extends DataMapInfo { --- End diff -- rename to `BlockletDataMapLoadModel` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147677650 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.MergedBlockIndex; +import org.apache.carbondata.format.MergedBlockIndexHeader; + +import org.apache.thrift.TBase; + +/** + * This class manages reading of index files with in the segment. The files it read can be + * carbonindex or carbonindexmerge files. + */ +public class SegmentIndexFileStore { + + private Map<String, byte[]> carbonIndexMap; + + public SegmentIndexFileStore() throws IOException { + carbonIndexMap = new HashMap<>(); + } + + /** + * Read all index files and keep the cache in it. + * + * @param segmentPath + * @throws IOException + */ + public void readAllIIndexOfSegment(String segmentPath) throws IOException { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + readMergeFile(carbonIndexFiles[i].getCanonicalPath()); + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + readIndexFile(carbonIndexFiles[i]); + } + } + } + + /** + * Read all index file names of the segment + * + * @param segmentPath + * @return + * @throws IOException + */ + public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + Set<String> indexFiles = new HashSet<>(); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath())); + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + indexFiles.add(carbonIndexFiles[i].getName()); + } + } + return new ArrayList<>(indexFiles); + } + + /** + * List all the index files inside merge file. + * @param mergeFile + * @return + * @throws IOException + */ + public List<String> getIndexFilesFromMergeFile(String mergeFile) throws IOException { + List<String> indexFiles = new ArrayList<>(); + ThriftReader thriftReader = new ThriftReader(mergeFile); + thriftReader.open(); + MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); + List<String> file_names = indexHeader.getFile_names(); + indexFiles.addAll(file_names); + thriftReader.close(); + return indexFiles; + } + + /** + * Read carbonindexmerge file and update the map + * + * @param mergeFilePath + * @throws IOException + */ + private void readMergeFile(String mergeFilePath) throws IOException { + ThriftReader thriftReader = new ThriftReader(mergeFilePath); + thriftReader.open(); + MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); + MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); + List<String> file_names = indexHeader.getFile_names(); + List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); + assert (file_names.size() == fileData.size()); + for (int i = 0; i < file_names.size(); i++) { + carbonIndexMap.put(file_names.get(i), fileData.get(i).array()); + } + thriftReader.close(); + } + + /** + * Read carbonindex file and convert to stream and add to map + * + * @param indexFile + * @throws IOException + */ + private void readIndexFile(CarbonFile indexFile) throws IOException { + String indexFilePath = indexFile.getCanonicalPath(); + DataInputStream dataInputStream = + FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath)); + byte[] bytes = new byte[(int) indexFile.getSize()]; + dataInputStream.readFully(bytes); + carbonIndexMap.put(indexFile.getName(), bytes); + } + + private MergedBlockIndexHeader readMergeBlockIndexHeader(ThriftReader thriftReader) + throws IOException { + return (MergedBlockIndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() { + @Override public TBase create() { + return new MergedBlockIndexHeader(); + } + }); + } + + private MergedBlockIndex readMergeBlockIndex(ThriftReader thriftReader) throws IOException { + return (MergedBlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() { + @Override public TBase create() { + return new MergedBlockIndex(); + } + }); + } + + /** + * Get the carbonindex file content + * + * @param fileName + * @return + */ + public byte[] getFileData(String fileName) { + return carbonIndexMap.get(fileName); + } + + /** + * List all the index files of the segment. + * + * @param segmentPath + * @return + */ + public static CarbonFile[] getCarbonIndexFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() + .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT); + } + }); + } + + /** + * Return the map tht contain index file name and content of the file. --- End diff -- typo tht --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1436 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/748/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147680151 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.MergedBlockIndex; +import org.apache.carbondata.format.MergedBlockIndexHeader; + +import org.apache.thrift.TBase; + +/** + * This class manages reading of index files with in the segment. The files it read can be + * carbonindex or carbonindexmerge files. + */ +public class SegmentIndexFileStore { + + private Map<String, byte[]> carbonIndexMap; --- End diff -- Add comment for this map --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147680430 --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java --- @@ -48,9 +48,11 @@ protected static final String CARBON_UPDATE_DELTA_EXT = ".updatedelta"; protected static final String DATA_PART_PREFIX = "part-"; protected static final String BATCH_PREFIX = "_batchno"; - protected static final String INDEX_FILE_EXT = ".carbonindex"; protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta"; + public static final String INDEX_FILE_EXT = ".carbonindex"; --- End diff -- Can we change it to package private static member? And can you change the above member from line 36 to 51 also --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147680637 --- Diff: format/src/main/thrift/carbondata_index_merge.thrift --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * File format description for CarbonData merged index file. --- End diff -- please mention it is for one segment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147681468 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter + +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String) + extends Partition { + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +class CarbonMergeFilesRDD( --- End diff -- Add comment to explain it is for merging the carbonindex file within one segment after the data loading --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147681655 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter + +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String) + extends Partition { + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +class CarbonMergeFilesRDD( + sc: SparkContext, + tablePath: String, + segments: Seq[String]) + extends CarbonRDD[String](sc, Nil) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") --- End diff -- Is this for testing? Please remove if not required --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147682103 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter + +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String) + extends Partition { + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +class CarbonMergeFilesRDD( + sc: SparkContext, + tablePath: String, + segments: Seq[String]) + extends CarbonRDD[String](sc, Nil) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + + override def getPartitions: Array[Partition] = { + segments.zipWithIndex.map {s => + CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) + }.toArray + } + + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { + val iter = new Iterator[String] { + val split = theSplit.asInstanceOf[CarbonMergeFilePartition] + logInfo("Merging carbon index files of segment : " + split.segmentPath) + + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(split.segmentPath) --- End diff -- Instead of creating an RDD, why not just do rdd.mapPartition and invoke this function to merge index file? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147682356 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter + +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String) + extends Partition { + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +class CarbonMergeFilesRDD( --- End diff -- Instead of creating an RDD, why not just do rdd.mapPartition and invoke this function to merge index file? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147683214 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala --- @@ -106,6 +108,10 @@ object Compactor { } if (finalMergeStatus) { + val mergedLoadNumber = mergedLoadName --- End diff -- Can you add this logic in `LoadMetadataDetails.getLoadNumber` so that it can be used here? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147689756 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java --- @@ -1376,6 +1376,13 @@ public static final String BITSET_PIPE_LINE_DEFAULT = "true"; + /** + * It will merge the carbon index files with in the segment to single segment. --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147690043 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java --- @@ -66,13 +70,17 @@ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache segmentLockMap = new ConcurrentHashMap<String, Object>(); } - @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) + @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier) throws IOException { - String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(); + String lruCacheKey = identifier.getUniqueTableSegmentIdentifier(); BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey); if (dataMap == null) { try { - dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier); + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFileStore.readAllIIndexOfSegment(CarbonTablePath.getSegmentPath( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147690179 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapInfo.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap.dev; + +/** + * Information required to build datamap + */ +public class DataMapInfo { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147690658 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java --- @@ -84,15 +92,36 @@ public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache @Override public List<BlockletDataMap> getAll( List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException { List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); + List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>(); try { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147690705 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapInfo.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import org.apache.carbondata.core.datamap.dev.DataMapInfo; + +public class BlockletDataMapInfo extends DataMapInfo { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147690846 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.MergedBlockIndex; +import org.apache.carbondata.format.MergedBlockIndexHeader; + +import org.apache.thrift.TBase; + +/** + * This class manages reading of index files with in the segment. The files it read can be + * carbonindex or carbonindexmerge files. + */ +public class SegmentIndexFileStore { + + private Map<String, byte[]> carbonIndexMap; + + public SegmentIndexFileStore() throws IOException { + carbonIndexMap = new HashMap<>(); + } + + /** + * Read all index files and keep the cache in it. + * + * @param segmentPath + * @throws IOException + */ + public void readAllIIndexOfSegment(String segmentPath) throws IOException { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + readMergeFile(carbonIndexFiles[i].getCanonicalPath()); + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + readIndexFile(carbonIndexFiles[i]); + } + } + } + + /** + * Read all index file names of the segment + * + * @param segmentPath + * @return + * @throws IOException + */ + public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + Set<String> indexFiles = new HashSet<>(); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath())); + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + indexFiles.add(carbonIndexFiles[i].getName()); + } + } + return new ArrayList<>(indexFiles); + } + + /** + * List all the index files inside merge file. + * @param mergeFile + * @return + * @throws IOException + */ + public List<String> getIndexFilesFromMergeFile(String mergeFile) throws IOException { + List<String> indexFiles = new ArrayList<>(); + ThriftReader thriftReader = new ThriftReader(mergeFile); + thriftReader.open(); + MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); + List<String> file_names = indexHeader.getFile_names(); + indexFiles.addAll(file_names); + thriftReader.close(); + return indexFiles; + } + + /** + * Read carbonindexmerge file and update the map + * + * @param mergeFilePath + * @throws IOException + */ + private void readMergeFile(String mergeFilePath) throws IOException { + ThriftReader thriftReader = new ThriftReader(mergeFilePath); + thriftReader.open(); + MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); + MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); + List<String> file_names = indexHeader.getFile_names(); + List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); + assert (file_names.size() == fileData.size()); + for (int i = 0; i < file_names.size(); i++) { + carbonIndexMap.put(file_names.get(i), fileData.get(i).array()); + } + thriftReader.close(); + } + + /** + * Read carbonindex file and convert to stream and add to map + * + * @param indexFile + * @throws IOException + */ + private void readIndexFile(CarbonFile indexFile) throws IOException { + String indexFilePath = indexFile.getCanonicalPath(); + DataInputStream dataInputStream = + FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath)); + byte[] bytes = new byte[(int) indexFile.getSize()]; + dataInputStream.readFully(bytes); + carbonIndexMap.put(indexFile.getName(), bytes); + } + + private MergedBlockIndexHeader readMergeBlockIndexHeader(ThriftReader thriftReader) + throws IOException { + return (MergedBlockIndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() { + @Override public TBase create() { + return new MergedBlockIndexHeader(); + } + }); + } + + private MergedBlockIndex readMergeBlockIndex(ThriftReader thriftReader) throws IOException { + return (MergedBlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() { + @Override public TBase create() { + return new MergedBlockIndex(); + } + }); + } + + /** + * Get the carbonindex file content + * + * @param fileName + * @return + */ + public byte[] getFileData(String fileName) { + return carbonIndexMap.get(fileName); + } + + /** + * List all the index files of the segment. + * + * @param segmentPath + * @return + */ + public static CarbonFile[] getCarbonIndexFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() + .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT); + } + }); + } + + /** + * Return the map tht contain index file name and content of the file. --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1436#discussion_r147690975 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.MergedBlockIndex; +import org.apache.carbondata.format.MergedBlockIndexHeader; + +import org.apache.thrift.TBase; + +/** + * This class manages reading of index files with in the segment. The files it read can be + * carbonindex or carbonindexmerge files. + */ +public class SegmentIndexFileStore { + + private Map<String, byte[]> carbonIndexMap; --- End diff -- ok --- |
Free forum by Nabble | Edit this page |