Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169520945 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java --- @@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) { @Override public void freeMemory() { - + byteArrayData = null; --- End diff -- other types like intData, longData can also be used, so please set the references to null for all the types here --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169540773 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -79,6 +87,44 @@ public void readAllIIndexOfSegment(String segmentPath) throws IOException { } } + /** + * Read all index files and keep the cache in it. + * + * @param segmentFileStore + * @throws IOException + */ + public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status, + boolean ignoreStatus) throws IOException { + List<CarbonFile> carbonIndexFiles = new ArrayList<>(); + if (segmentFileStore.getLocationMap() == null) { + return; + } + for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore + .getLocationMap().entrySet()) { + String location = locations.getKey(); + if (locations.getValue().isRelative()) { + location = --- End diff -- This location variable fetching and value modification can be done after the below if check --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169522942 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java --- @@ -17,91 +17,59 @@ package org.apache.carbondata.core.indexstore; +import java.util.Objects; + import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; /** - * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment + * Class holds the absoluteTableIdentifier and segment to uniquely identify a segment --- End diff -- Modify the comment to remove absoluteTableIdentifier --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169537887 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java --- @@ -660,22 +650,23 @@ public boolean isScanRequired(FilterResolverIntf filterExp) { return blocklets; } - @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) { + @Override + public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } - // First get the partitions which are stored inside datamap. - List<String> storedPartitions = getPartitions(); // if it has partitioned datamap but there is no partitioned information stored, it means // partitions are dropped so return empty list. - if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) { - return new ArrayList<>(); - } - if (storedPartitions != null && storedPartitions.size() > 0) { + if (partitions != null) { + // First get the partitions which are stored inside datamap. + String[] fileDetails = getFileDetails(); // Check the exact match of partition information inside the stored partitions. boolean found = false; - if (partitions != null && partitions.size() > 0) { - found = partitions.containsAll(storedPartitions); + Path folderPath = new Path(fileDetails[0]); + for (PartitionSpec spec : partitions) { + if (folderPath.equals(spec.getLocation())) { + found = true; --- End diff -- break the loop once found --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169540997 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -161,9 +212,13 @@ private void readMergeFile(String mergeFilePath) throws IOException { MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); List<String> file_names = indexHeader.getFile_names(); List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); + CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath); assert (file_names.size() == fileData.size()); for (int i = 0; i < file_names.size(); i++) { carbonIndexMap.put(file_names.get(i), fileData.get(i).array()); + carbonIndexMapWithFullPath.put( + mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR + + file_names.get(i), fileData.get(i).array()); } thriftReader.close(); --- End diff -- use try and finally block and close the thrift reader inside finally block --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169518845 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents one load of carbondata + */ +public class Segment implements Serializable { + + private static final long serialVersionUID = 7044555408162234064L; + + private String segmentId; + + private String segmentFileName; + + public Segment(String segmentId, String segmentFileName) { + this.segmentId = segmentId; + this.segmentFileName = segmentFileName; + } + + public String getSegmentId() { + return segmentId; + } + + public String getSegmentFileName() { + return segmentFileName; + } + + public static List<Segment> toSegmentList(String[] segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.length); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static List<Segment> toSegmentList(List<String> segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.size()); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static Segment toSegment(String segmentId) { + String[] split = segmentId.split("#"); + if (split.length > 1) { + return new Segment(split[0], split[1]); + } else if (split.length > 0) { + return new Segment(split[0], null); + } + return new Segment(segmentId, null); + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Segment segment = (Segment) o; + return Objects.equals(segmentId, segment.segmentId); + } + + @Override public int hashCode() { + --- End diff -- Remove his extra line --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169518712 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents one load of carbondata + */ +public class Segment implements Serializable { + + private static final long serialVersionUID = 7044555408162234064L; + + private String segmentId; + + private String segmentFileName; + + public Segment(String segmentId, String segmentFileName) { + this.segmentId = segmentId; + this.segmentFileName = segmentFileName; + } + + public String getSegmentId() { + return segmentId; + } + + public String getSegmentFileName() { + return segmentFileName; + } + + public static List<Segment> toSegmentList(String[] segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.length); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static List<Segment> toSegmentList(List<String> segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.size()); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static Segment toSegment(String segmentId) { + String[] split = segmentId.split("#"); + if (split.length > 1) { + return new Segment(split[0], split[1]); + } else if (split.length > 0) { + return new Segment(split[0], null); + } + return new Segment(segmentId, null); --- End diff -- Please provide a comment for this method with an example covering the if, else if and else case --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169542459 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { --- End diff -- Move this complete if check inside below if check {if (carbonFiles != null && carbonFiles.length > 0)} --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3800/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2557/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169543287 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { --- End diff -- Move this if check after the below if check for {if (listFiles != null && listFiles.length > 0)} --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169543473 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); --- End diff -- Use try and finally here and call FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)) inside finally block --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169544464 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + public void readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + SegmentFile segmentFile = readSegmentFile(segmentFilePath); + this.tablePath = tablePath; + this.segmentFile = segmentFile; --- End diff -- Here we are directly updating the member variable. If it is called from multiple places then in concurrent operations it can cause problem. Kindly re-verify and if possible avoid updating the member variables if getting called from multiple places --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169544627 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + public void readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + SegmentFile segmentFile = readSegmentFile(segmentFilePath); + this.tablePath = tablePath; + this.segmentFile = segmentFile; + } + + public String getTablePath() { + return tablePath; + } + + /** + * Gets all the index files and related carbondata files from this segment. First user needs to + * call @readIndexFiles method before calling it. + * @return + */ + public Map<String, List<String>> getIndexFilesMap() { + return indexFilesMap; + } + + /** + * Reads all index files which are located in this segment. First user needs to call + * @readSegment method before calling it. + * @throws IOException + */ + public void readIndexFiles() throws IOException { + readIndexFiles(SegmentStatus.SUCCESS, false); + } + + /** + * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just + * reads all index files + * @param status + * @param ignoreStatus + * @throws IOException + */ + private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException { + if (indexFilesMap != null) { + return; + } + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFilesMap = new HashMap<>(); + indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus); + Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + List<String> blocks = new ArrayList<>(); + for (DataFileFooter footer : indexInfo) { + blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); + } + indexFilesMap.put(entry.getKey(), blocks); + } + } + + /** + * Gets all index files from this segment + * @return + */ + public Map<String, String> getIndexFiles() { + Map<String, String> indexFiles = new HashMap<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { + for (String indexFile : entry.getValue().getFiles()) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile, + entry.getValue().mergeFileName); + } + } + } + } + return indexFiles; + } + + /** + * Drops the partition related files from the segment file of the segment and writes + * to a new file. First iterator over segment file and check the path it needs to be dropped. + * And update the status with delete if it found. + * + * @param uniqueId + * @throws IOException + */ + public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, + String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments) + throws IOException { + readSegment(tablePath, segment.getSegmentFileName()); + boolean updateSegment = false; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + Path path = new Path(location); + // Update the status to delete if path equals + for (PartitionSpec spec : partitionSpecs) { + if (path.equals(spec.getLocation())) { + entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage()); + updateSegment = true; + break; + } + } + } + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath); + writePath = + writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId + + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, writePath); + // Check whether we can completly remove the segment. + boolean deleteSegment = true; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) { + deleteSegment = false; --- End diff -- break the loop once deleteSegment is set to false --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169544845 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + public void readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + SegmentFile segmentFile = readSegmentFile(segmentFilePath); + this.tablePath = tablePath; + this.segmentFile = segmentFile; + } + + public String getTablePath() { + return tablePath; + } + + /** + * Gets all the index files and related carbondata files from this segment. First user needs to + * call @readIndexFiles method before calling it. + * @return + */ + public Map<String, List<String>> getIndexFilesMap() { + return indexFilesMap; + } + + /** + * Reads all index files which are located in this segment. First user needs to call + * @readSegment method before calling it. + * @throws IOException + */ + public void readIndexFiles() throws IOException { + readIndexFiles(SegmentStatus.SUCCESS, false); + } + + /** + * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just + * reads all index files + * @param status + * @param ignoreStatus + * @throws IOException + */ + private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException { + if (indexFilesMap != null) { + return; + } + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFilesMap = new HashMap<>(); + indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus); + Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + List<String> blocks = new ArrayList<>(); + for (DataFileFooter footer : indexInfo) { + blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); + } + indexFilesMap.put(entry.getKey(), blocks); + } + } + + /** + * Gets all index files from this segment + * @return + */ + public Map<String, String> getIndexFiles() { + Map<String, String> indexFiles = new HashMap<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { + for (String indexFile : entry.getValue().getFiles()) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile, + entry.getValue().mergeFileName); + } + } + } + } + return indexFiles; + } + + /** + * Drops the partition related files from the segment file of the segment and writes + * to a new file. First iterator over segment file and check the path it needs to be dropped. + * And update the status with delete if it found. + * + * @param uniqueId + * @throws IOException + */ + public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, + String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments) + throws IOException { + readSegment(tablePath, segment.getSegmentFileName()); + boolean updateSegment = false; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + Path path = new Path(location); + // Update the status to delete if path equals + for (PartitionSpec spec : partitionSpecs) { + if (path.equals(spec.getLocation())) { + entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage()); + updateSegment = true; + break; + } + } + } + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath); + writePath = + writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId + + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, writePath); + // Check whether we can completly remove the segment. + boolean deleteSegment = true; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) { + deleteSegment = false; + } + } + if (deleteSegment) { + toBeDeletedSegments.add(segment.getSegmentId()); + } + if (updateSegment) { + toBeUpdatedSegments.add(segment.getSegmentId()); + } + } + + /** + * Update the table status file with the dropped partitions information + * + * @param carbonTable + * @param uniqueId + * @param toBeUpdatedSegments + * @param toBeDeleteSegments + * @throws IOException + */ + public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId, + List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException { + Set<Segment> segmentSet = new HashSet<>( --- End diff -- Move segmentSet creation inside the below if check --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3802/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2558/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3591/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3592/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169619680 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents one load of carbondata + */ +public class Segment implements Serializable { + + private static final long serialVersionUID = 7044555408162234064L; + + private String segmentId; + + private String segmentFileName; + + public Segment(String segmentId, String segmentFileName) { + this.segmentId = segmentId; + this.segmentFileName = segmentFileName; + } + + public String getSegmentId() { + return segmentId; + } + + public String getSegmentFileName() { + return segmentFileName; + } + + public static List<Segment> toSegmentList(String[] segmentIds) { --- End diff -- ok, renamed --- |
Free forum by Nabble | Edit this page |