|
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169619961 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +import java.io.Serializable; +import java.net.URI; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.core.datastore.impl.FileFactory; + +import org.apache.hadoop.fs.Path; + +/** + * Holds partition information. + */ +public class PartitionSpec implements Serializable { + + private static final long serialVersionUID = 4828007433384867678L; + + private List<String> partitions; --- End diff -- It signifies column=value, I have added comment --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169620760 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java --- @@ -185,26 +212,27 @@ public void clear(String segmentId) { @Override public void clear() { for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { - clear(segmentId); + clear(new Segment(segmentId, null)); } } @Override public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException { BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable; List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>(); - if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { - identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), - mapDistributable.getFilePath())); - } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + String indexPath = mapDistributable.getFilePath(); + if (indexPath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + String parent = indexPath.substring(0, indexPath.lastIndexOf("/")); --- End diff -- Ok, I have changed to HDFS Path and get parent from it. --- |
|
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3803/ --- |
|
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2559/ --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169626278 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; --- End diff -- Added in CarbonOutputCOmmiter abortjob --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169626625 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627117 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -2449,5 +2496,40 @@ public static String encodeToString(byte[] bytes) throws UnsupportedEncodingExce return updatedMinMaxValues; } + /** + * Generate the blockid as per the block path + * + * @param identifier + * @param filePath + * @param segmentId + * @return + */ + public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath, + String segmentId) { + String blockId; + String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length()); + String tablePath = identifier.getTablePath(); + if (filePath.startsWith(tablePath)) { + String factDir = + CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier()) + .getFactDir(); + if (filePath.startsWith(factDir)) { + blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId + + CarbonCommonConstants.FILE_SEPARATOR + blockName; + } else { + String partitionDir = + filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1); + + blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" --- End diff -- Added comment --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627320 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents one load of carbondata + */ +public class Segment implements Serializable { + + private static final long serialVersionUID = 7044555408162234064L; + + private String segmentId; + + private String segmentFileName; + + public Segment(String segmentId, String segmentFileName) { + this.segmentId = segmentId; + this.segmentFileName = segmentFileName; + } + + public String getSegmentId() { + return segmentId; + } + + public String getSegmentFileName() { + return segmentFileName; + } + + public static List<Segment> toSegmentList(String[] segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.length); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static List<Segment> toSegmentList(List<String> segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.size()); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static Segment toSegment(String segmentId) { + String[] split = segmentId.split("#"); + if (split.length > 1) { + return new Segment(split[0], split[1]); + } else if (split.length > 0) { + return new Segment(split[0], null); + } + return new Segment(segmentId, null); --- End diff -- Added --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627375 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents one load of carbondata + */ +public class Segment implements Serializable { + + private static final long serialVersionUID = 7044555408162234064L; + + private String segmentId; + + private String segmentFileName; + + public Segment(String segmentId, String segmentFileName) { + this.segmentId = segmentId; + this.segmentFileName = segmentFileName; + } + + public String getSegmentId() { + return segmentId; + } + + public String getSegmentFileName() { + return segmentFileName; + } + + public static List<Segment> toSegmentList(String[] segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.length); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static List<Segment> toSegmentList(List<String> segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.size()); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static Segment toSegment(String segmentId) { + String[] split = segmentId.split("#"); + if (split.length > 1) { + return new Segment(split[0], split[1]); + } else if (split.length > 0) { + return new Segment(split[0], null); + } + return new Segment(segmentId, null); + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Segment segment = (Segment) o; + return Objects.equals(segmentId, segment.segmentId); + } + + @Override public int hashCode() { + --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627561 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java --- @@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) { @Override public void freeMemory() { - + byteArrayData = null; --- End diff -- In decimal page it only uses byteArrayData, so not required --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627883 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java --- @@ -17,91 +17,59 @@ package org.apache.carbondata.core.indexstore; +import java.util.Objects; + import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; /** - * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment + * Class holds the absoluteTableIdentifier and segment to uniquely identify a segment --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628065 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java --- @@ -660,22 +650,23 @@ public boolean isScanRequired(FilterResolverIntf filterExp) { return blocklets; } - @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) { + @Override + public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } - // First get the partitions which are stored inside datamap. - List<String> storedPartitions = getPartitions(); // if it has partitioned datamap but there is no partitioned information stored, it means // partitions are dropped so return empty list. - if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) { - return new ArrayList<>(); - } - if (storedPartitions != null && storedPartitions.size() > 0) { + if (partitions != null) { + // First get the partitions which are stored inside datamap. + String[] fileDetails = getFileDetails(); // Check the exact match of partition information inside the stored partitions. boolean found = false; - if (partitions != null && partitions.size() > 0) { - found = partitions.containsAll(storedPartitions); + Path folderPath = new Path(fileDetails[0]); + for (PartitionSpec spec : partitions) { + if (folderPath.equals(spec.getLocation())) { + found = true; --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628343 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -79,6 +87,44 @@ public void readAllIIndexOfSegment(String segmentPath) throws IOException { } } + /** + * Read all index files and keep the cache in it. + * + * @param segmentFileStore + * @throws IOException + */ + public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status, + boolean ignoreStatus) throws IOException { + List<CarbonFile> carbonIndexFiles = new ArrayList<>(); + if (segmentFileStore.getLocationMap() == null) { + return; + } + for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore + .getLocationMap().entrySet()) { + String location = locations.getKey(); + if (locations.getValue().isRelative()) { + location = --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628603 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -161,9 +212,13 @@ private void readMergeFile(String mergeFilePath) throws IOException { MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); List<String> file_names = indexHeader.getFile_names(); List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); + CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath); assert (file_names.size() == fileData.size()); for (int i = 0; i < file_names.size(); i++) { carbonIndexMap.put(file_names.get(i), fileData.get(i).array()); + carbonIndexMapWithFullPath.put( + mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR + + file_names.get(i), fileData.get(i).array()); } thriftReader.close(); --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628918 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169629127 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { --- End diff -- ok --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169629728 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); --- End diff -- Failure case is handled in abortJob committer --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169632391 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + public void readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + SegmentFile segmentFile = readSegmentFile(segmentFilePath); + this.tablePath = tablePath; + this.segmentFile = segmentFile; --- End diff -- Ok, moved to constructor --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3593/ --- |
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169635618 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + public void readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + SegmentFile segmentFile = readSegmentFile(segmentFilePath); + this.tablePath = tablePath; + this.segmentFile = segmentFile; + } + + public String getTablePath() { + return tablePath; + } + + /** + * Gets all the index files and related carbondata files from this segment. First user needs to + * call @readIndexFiles method before calling it. + * @return + */ + public Map<String, List<String>> getIndexFilesMap() { + return indexFilesMap; + } + + /** + * Reads all index files which are located in this segment. First user needs to call + * @readSegment method before calling it. + * @throws IOException + */ + public void readIndexFiles() throws IOException { + readIndexFiles(SegmentStatus.SUCCESS, false); + } + + /** + * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just + * reads all index files + * @param status + * @param ignoreStatus + * @throws IOException + */ + private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException { + if (indexFilesMap != null) { + return; + } + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFilesMap = new HashMap<>(); + indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus); + Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + List<String> blocks = new ArrayList<>(); + for (DataFileFooter footer : indexInfo) { + blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); + } + indexFilesMap.put(entry.getKey(), blocks); + } + } + + /** + * Gets all index files from this segment + * @return + */ + public Map<String, String> getIndexFiles() { + Map<String, String> indexFiles = new HashMap<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { + for (String indexFile : entry.getValue().getFiles()) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile, + entry.getValue().mergeFileName); + } + } + } + } + return indexFiles; + } + + /** + * Drops the partition related files from the segment file of the segment and writes + * to a new file. First iterator over segment file and check the path it needs to be dropped. + * And update the status with delete if it found. + * + * @param uniqueId + * @throws IOException + */ + public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, + String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments) + throws IOException { + readSegment(tablePath, segment.getSegmentFileName()); + boolean updateSegment = false; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + Path path = new Path(location); + // Update the status to delete if path equals + for (PartitionSpec spec : partitionSpecs) { + if (path.equals(spec.getLocation())) { + entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage()); + updateSegment = true; + break; + } + } + } + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath); + writePath = + writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId + + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, writePath); + // Check whether we can completly remove the segment. + boolean deleteSegment = true; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) { + deleteSegment = false; --- End diff -- ok --- |
| Free forum by Nabble | Edit this page |
