Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178594069 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java --- @@ -79,26 +80,27 @@ * * @param segments * @param filterExp + * @param readCommitted * @return */ public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp, - List<PartitionSpec> partitions) throws IOException { + List<PartitionSpec> partitions, ReadCommitted readCommitted) throws IOException { --- End diff -- ReadCommited is implemented in order to set the read scope. Currently for Unmanaged Table the Read Scope is LatestFileRead which is going to take a snapshot of the list of Index files during query phase starting i.e. in CarbonInputSplit. In later phase of query life cycle even though new files get placed, only files read during initial snapshot is only returned. Similarly TableStatusReadCommitter is going to take a snapshot of Table Status file and will be used by Managed Table. One time snapshot of LoadMetadataDetails will be taken in CarbonTableInputFormat and later the Table Status snapshot will be only referred to return the list of files. So in parallel in case Table Status gets updated then new updated segment and file list wont be reflected. This way we defined the read commit scope. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2131 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4262/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2131 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4768/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2131 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3540/ --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178714736 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -200,6 +216,90 @@ public void readAllIIndexOfSegment(CarbonFile[] carbonFiles) throws IOException return indexFiles; } + /** + * Read all index file names of the carbon File Path. + * + * @return + * @throws IOException + */ + public Map<String, String> getReadCommittedIndexFilesFromPath(String carbonFilePath) + throws IOException { + // Get only those index files which are mentioned in the + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(carbonFilePath, carbonIndexMap); + Map<String, String> indexFiles = new HashMap<>(); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + List<String> indexFilesFromMergeFile = + getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()); + for (String file : indexFilesFromMergeFile) { + indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath() + + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName()); + } + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null); + } + } + return indexFiles; + } + + /** + * Read all index file names of the segment + * + * @return + * @throws IOException + */ + public Map<String, String> getReadCommittedIndexFilesFromPath(String path, List<Segment> segments) + throws IOException { + // Only returns indexes matching the segment. + Map<String, String> indexFiles = new HashMap<>(); + for (Segment seg : segments) { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(path, carbonIndexMap); + for (int i = 0; i < carbonIndexFiles.length; i++) { --- End diff -- Duplicate logic across functions --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178715161 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommitted.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.readcommitter; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +public class LatestFilesReadCommitted implements ReadCommitted { + + private String carbonFilePath; + private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; + + public LatestFilesReadCommitted(String path) { + this.carbonFilePath = path; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + // TODO Put proper Log and throw the exception out. + System.out.println("Error while reding index file"); + } + } + + @Override public LoadMetadataDetails[] getSegmentList() throws IOException { + // Read the Segment path and form the LoadMetadataDetails array. + File fs = new File(carbonFilePath); + + if (fs.isDirectory()) { + + CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); + LoadMetadataDetails[] loadMetadataDetailsArray = + new LoadMetadataDetails[carbonIndexFiles.length]; + int loadCount = 0; + for (int i = 0; i < carbonIndexFiles.length; i++) { --- End diff -- There can be multiple indexfiles under same logical segment/transactionid --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178717012 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommitted.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.readcommitter; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +public class LatestFilesReadCommitted implements ReadCommitted { + + private String carbonFilePath; + private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; + + public LatestFilesReadCommitted(String path) { + this.carbonFilePath = path; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + // TODO Put proper Log and throw the exception out. + System.out.println("Error while reding index file"); + } + } + + @Override public LoadMetadataDetails[] getSegmentList() throws IOException { + // Read the Segment path and form the LoadMetadataDetails array. + File fs = new File(carbonFilePath); + + if (fs.isDirectory()) { + + CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); --- End diff -- should read from readCommittedIndexFileSnapShot, not filesystem --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2131 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4272/ --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178732017 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommitted.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.readcommitter; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +public class LatestFilesReadCommitted implements ReadCommitted { + + private String carbonFilePath; + private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; + + public LatestFilesReadCommitted(String path) { + this.carbonFilePath = path; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + // TODO Put proper Log and throw the exception out. + System.out.println("Error while reding index file"); + } + } + + @Override public LoadMetadataDetails[] getSegmentList() throws IOException { + // Read the Segment path and form the LoadMetadataDetails array. + File fs = new File(carbonFilePath); + + if (fs.isDirectory()) { + + CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); --- End diff -- Currently it is read from the FileSystem and matched with the readCommittedIndexFileSnapShot. In case the file is present in readCommittedIndexFileSnapShot then only it is choosen. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2131 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4774/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2131 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4277/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2131 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2131 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4775/ --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178739522 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommitted.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.readcommitter; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +public class LatestFilesReadCommitted implements ReadCommitted { + + private String carbonFilePath; + private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; + + public LatestFilesReadCommitted(String path) { + this.carbonFilePath = path; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + // TODO Put proper Log and throw the exception out. + System.out.println("Error while reding index file"); + } + } + + @Override public LoadMetadataDetails[] getSegmentList() throws IOException { + // Read the Segment path and form the LoadMetadataDetails array. + File fs = new File(carbonFilePath); + + if (fs.isDirectory()) { + + CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); + LoadMetadataDetails[] loadMetadataDetailsArray = + new LoadMetadataDetails[carbonIndexFiles.length]; + int loadCount = 0; + for (int i = 0; i < carbonIndexFiles.length; i++) { --- End diff -- Rectified. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178739559 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java --- @@ -200,6 +216,90 @@ public void readAllIIndexOfSegment(CarbonFile[] carbonFiles) throws IOException return indexFiles; } + /** + * Read all index file names of the carbon File Path. + * + * @return + * @throws IOException + */ + public Map<String, String> getReadCommittedIndexFilesFromPath(String carbonFilePath) + throws IOException { + // Get only those index files which are mentioned in the + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(carbonFilePath, carbonIndexMap); + Map<String, String> indexFiles = new HashMap<>(); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + List<String> indexFilesFromMergeFile = + getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()); + for (String file : indexFilesFromMergeFile) { + indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath() + + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName()); + } + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null); + } + } + return indexFiles; + } + + /** + * Read all index file names of the segment + * + * @return + * @throws IOException + */ + public Map<String, String> getReadCommittedIndexFilesFromPath(String path, List<Segment> segments) + throws IOException { + // Only returns indexes matching the segment. + Map<String, String> indexFiles = new HashMap<>(); + for (Segment seg : segments) { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(path, carbonIndexMap); + for (int i = 0; i < carbonIndexFiles.length; i++) { --- End diff -- Removed --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178739606 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java --- @@ -83,28 +85,59 @@ public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) { } @Override - public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { + public List<CoarseGrainDataMap> getDataMaps(Segment segment, ReadCommitted readCommitted) + throws IOException { List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - getTableBlockIndexUniqueIdentifiers(segment); + getTableBlockIndexUniqueIdentifiers(segment, readCommitted); return cache.getAll(tableBlockIndexUniqueIdentifiers); } - private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers( - Segment segment) throws IOException { + private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment, + ReadCommitted readCommitted) throws IOException { List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); if (tableBlockIndexUniqueIdentifiers == null) { tableBlockIndexUniqueIdentifiers = new ArrayList<>(); + // TODO: integrate with ReadCommitted + // ReadCommitted readCommitted; + // if (job.getConfiguration().get(CARBON_UNMANAGED_TABLE).equalsIgnoreCase("true")) { + // updateStatusManager = null; + // readCommitted = new LatestFilesReadCommitted(identifier.getTablePath()); + // } else { + // loadMetadataDetails = SegmentStatusManager + // .readTableStatusFile(CarbonTablePath + // .getTableStatusFilePath(identifier.getTablePath())); + // updateStatusManager = + // new SegmentUpdateStatusManager(identifier, loadMetadataDetails); + // readCommitted = + // new TableStatusReadCommitted(job, this, loadMetadataDetails, updateStatusManager); + // } + // Map<String, String> indexFiles = readCommitted.getCommittedIndexList(segment); Map<String, String> indexFiles; - if (segment.getSegmentFileName() == null) { - String path = - CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); - indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); + if (CarbonUtil.isUnmanagedCarbonTable(identifier.getTablePath(), true)) { + if (null != readCommitted) { + indexFiles = readCommitted.getCommittedIndexMapSegments(); + } else { + indexFiles = + new SegmentIndexFileStore().getIndexFilesFromSegment(identifier.getTablePath()); + } } else { - SegmentFileStore fileStore = - new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); - indexFiles = fileStore.getIndexFiles(); + if (segment.getSegmentFileName() == null) { + + if (null != readCommitted) { + indexFiles = readCommitted.getCommittedIndexMapPerSegment(segment); --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2131#discussion_r178739855 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java --- @@ -48,23 +48,39 @@ public CarbonTableBuilder tablePath(String tablePath) { return this; } + + public CarbonTableBuilder isUnManagedTable(boolean isUnManagedTable) { + Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null"); + this.unManagedTable = isUnManagedTable; + return this; + } + public CarbonTableBuilder tableSchema(TableSchema tableSchema) { Objects.requireNonNull(tableSchema, "tableSchema should not be null"); this.tableSchema = tableSchema; return this; } + public CarbonTableBuilder setUUID(long uuid) { --- End diff -- Removed --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on the issue:
https://github.com/apache/carbondata/pull/2131 Retest this please. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2131 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3550/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2131 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4777/ --- |
Free forum by Nabble | Edit this page |