ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434412447 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); Review comment: sum is understandable. There are 2 people reviewed before ! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434412721 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434413651 ########## File path: sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.Field; +import org.apache.carbondata.core.util.CarbonProperties; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test suite for {@link CSVCarbonWriter} + */ +public class PaginationCarbonReaderTest { Review comment: already there. please check ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434414148 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( + "from row id:" + fromRowNumber + " is greater than to row id:" + toRowNumber); + } + if (toRowNumber > getTotalRows()) { + throw new IllegalArgumentException( Review comment: user should know what to pass, getTotalRows() provided for this reason. If by mistake he passes wrong value. we should not assume and give till end of rows. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434414895 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( Review comment: argument itslef if fromRowNumber, error message is enough ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); Review comment: argument itslef if fromRowNumber, error message is enough ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434414946 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); Review comment: argument itslef is fromRowNumber, error message is enough ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434414895 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( Review comment: argument itslef is fromRowNumber, error message is enough ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434412447 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); Review comment: sum is understandable. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#issuecomment-638130343 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434515260 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); Review comment: `summationOfRowsInEachBlocklet` would be more meaningful i guess, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434515638 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); Review comment: just from row id in error, doesn't make much sense right ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r434515732 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater 0 (as row id starts from 1) + * and less than equal to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than equals to fromRowNumber and not outside the total rows + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( Review comment: same as above ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#issuecomment-638155466 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#issuecomment-638234676 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3130/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#issuecomment-638235543 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1406/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r435146300 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); Review comment: Any reason for first split rows count handling seperately instead of doing in below for loop itself with loop starting with i = 0; If it was unintentional, suggest to remove outside & start loop from 0. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r435181761 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater than 0 (as row id starts from 1) + * and less than or equals to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than or equals to fromRowNumber + * and should not cross the total rows count + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( + "from row id:" + fromRowNumber + " is greater than to row id:" + toRowNumber); + } + if (toRowNumber > getTotalRows()) { + throw new IllegalArgumentException( + "to row id:" + toRowNumber + " is greater than total rows:" + getTotalRows()); + } + return getRows(fromRowNumber, toRowNumber); + } + + /** + * Get total rows in the folder or a list of CarbonData files. + * It is based on the snapshot of files taken while building the reader. + * + * @return total rows from all the files in the reader. + */ + public long getTotalRows() { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + return rowCountInSplits.get(rowCountInSplits.size() - 1); + } + + /** + * This interface is for python to call java. + * Because python cannot understand java Long object. so send string object. + * + * Get total rows in the folder or a list of CarbonData files. + * It is based on the snapshot of files taken while building the reader. + * + * + * @return total rows from all the files in the reader. + */ + public String getTotalRowsAsString() { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString(); + } + + private static int findBlockletIndex(List<Long> summationArray, Long key) { + // summation array is in sorted order, so can use the binary search. + int index = Collections.binarySearch(summationArray, key); + if (index < 0) { + // when key not found, binary search returns negative index [-1 to -N]. + // which is the possible place where key can be inserted. + // with one shifted position. As 0 is also a valid index. + // offset the one index shifted and get absolute value of it. + index = Math.abs(index + 1); + } + return index; + } + + private Range getBlockletIndexRange(long fromRowNumber, long toRowNumber) { + // find the matching blocklet index range by checking with from and to. + int upperBound = findBlockletIndex(rowCountInSplits, toRowNumber); + // lower bound cannot be more than upper bound, so work on sub list. + int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), fromRowNumber); + return new Range(lowerBound, upperBound); + } + + private Object[] getRows(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + int rowCount = 0; + Object[] rows = new Object[(int)(toRowNumber - fromRowNumber + 1)]; + // get the matching split index (blocklets) range for the input range. + Range blockletIndexRange = getBlockletIndexRange(fromRowNumber, toRowNumber); + for (int i = blockletIndexRange.getFrom(); i <= blockletIndexRange.getTo(); i++) { + String blockletUniqueId = String.valueOf(i); + BlockletRows blockletRows; + if (cache.get(blockletUniqueId) != null) { + blockletRows = (BlockletRows)cache.get(blockletUniqueId); + } else { + BlockletDetailInfo detailInfo = + ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo(); + int rowCountInBlocklet = detailInfo.getRowCount(); + Object[] rowsInBlocklet = new Object[rowCountInBlocklet]; + // read the rows from the blocklet + // TODO: read blocklets in multi-thread if there is a performance requirement. + readerBuilder.setInputSplit(allBlockletSplits.get(i)); + CarbonReader<Object> carbonReader = readerBuilder.build(); + int count = 0; + while (carbonReader.hasNext()) { Review comment: we don't continue with next splits if current split read fails ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r435183240 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater than 0 (as row id starts from 1) + * and less than or equals to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than or equals to fromRowNumber + * and should not cross the total rows count + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( + "from row id:" + fromRowNumber + " is greater than to row id:" + toRowNumber); + } + if (toRowNumber > getTotalRows()) { + throw new IllegalArgumentException( + "to row id:" + toRowNumber + " is greater than total rows:" + getTotalRows()); + } + return getRows(fromRowNumber, toRowNumber); + } + + /** + * Get total rows in the folder or a list of CarbonData files. + * It is based on the snapshot of files taken while building the reader. + * + * @return total rows from all the files in the reader. + */ + public long getTotalRows() { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + return rowCountInSplits.get(rowCountInSplits.size() - 1); + } + + /** + * This interface is for python to call java. + * Because python cannot understand java Long object. so send string object. + * + * Get total rows in the folder or a list of CarbonData files. + * It is based on the snapshot of files taken while building the reader. + * + * + * @return total rows from all the files in the reader. + */ + public String getTotalRowsAsString() { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString(); + } + + private static int findBlockletIndex(List<Long> summationArray, Long key) { + // summation array is in sorted order, so can use the binary search. + int index = Collections.binarySearch(summationArray, key); + if (index < 0) { + // when key not found, binary search returns negative index [-1 to -N]. + // which is the possible place where key can be inserted. + // with one shifted position. As 0 is also a valid index. + // offset the one index shifted and get absolute value of it. + index = Math.abs(index + 1); + } + return index; + } + + private Range getBlockletIndexRange(long fromRowNumber, long toRowNumber) { + // find the matching blocklet index range by checking with from and to. + int upperBound = findBlockletIndex(rowCountInSplits, toRowNumber); + // lower bound cannot be more than upper bound, so work on sub list. + int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), fromRowNumber); + return new Range(lowerBound, upperBound); + } + + private Object[] getRows(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + int rowCount = 0; + Object[] rows = new Object[(int)(toRowNumber - fromRowNumber + 1)]; + // get the matching split index (blocklets) range for the input range. + Range blockletIndexRange = getBlockletIndexRange(fromRowNumber, toRowNumber); + for (int i = blockletIndexRange.getFrom(); i <= blockletIndexRange.getTo(); i++) { + String blockletUniqueId = String.valueOf(i); + BlockletRows blockletRows; + if (cache.get(blockletUniqueId) != null) { + blockletRows = (BlockletRows)cache.get(blockletUniqueId); + } else { + BlockletDetailInfo detailInfo = + ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo(); + int rowCountInBlocklet = detailInfo.getRowCount(); + Object[] rowsInBlocklet = new Object[rowCountInBlocklet]; + // read the rows from the blocklet + // TODO: read blocklets in multi-thread if there is a performance requirement. + readerBuilder.setInputSplit(allBlockletSplits.get(i)); + CarbonReader<Object> carbonReader = readerBuilder.build(); + int count = 0; + while (carbonReader.hasNext()) { + rowsInBlocklet[count++] = carbonReader.readNextRow(); + } + carbonReader.close(); Review comment: hasNext() seem to throw IOException, InterruptedException exceptions. Enclose this while loop in try/catch/finally block is better ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r435258695 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + for (int i = 1; i < splits.size(); i++) { + // prepare a summation array of row counts in each blocklet, + // this is used for pruning with pagination vales. + // At current index, it contains sum of rows of all the blocklet from previous + current. + sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); + } + } + + /** + * Pagination query with from and to range. + * + * @param fromRowNumber must be greater than 0 (as row id starts from 1) + * and less than or equals to toRowNumber + * @param toRowNumber must be greater than 0 (as row id starts from 1) + * and greater than or equals to fromRowNumber + * and should not cross the total rows count + * @return array of rows between fromRowNumber and toRowNumber (inclusive) + * @throws Exception + */ + public Object[] read(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + if (fromRowNumber < 1) { + throw new IllegalArgumentException("from row id:" + fromRowNumber + " is less than 1"); + } + if (fromRowNumber > toRowNumber) { + throw new IllegalArgumentException( + "from row id:" + fromRowNumber + " is greater than to row id:" + toRowNumber); + } + if (toRowNumber > getTotalRows()) { + throw new IllegalArgumentException( + "to row id:" + toRowNumber + " is greater than total rows:" + getTotalRows()); + } + return getRows(fromRowNumber, toRowNumber); + } + + /** + * Get total rows in the folder or a list of CarbonData files. + * It is based on the snapshot of files taken while building the reader. + * + * @return total rows from all the files in the reader. + */ + public long getTotalRows() { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + return rowCountInSplits.get(rowCountInSplits.size() - 1); + } + + /** + * This interface is for python to call java. + * Because python cannot understand java Long object. so send string object. + * + * Get total rows in the folder or a list of CarbonData files. + * It is based on the snapshot of files taken while building the reader. + * + * + * @return total rows from all the files in the reader. + */ + public String getTotalRowsAsString() { + if (isClosed) { + throw new RuntimeException("Pagination Reader is closed. please build again"); + } + return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString(); + } + + private static int findBlockletIndex(List<Long> summationArray, Long key) { + // summation array is in sorted order, so can use the binary search. + int index = Collections.binarySearch(summationArray, key); + if (index < 0) { + // when key not found, binary search returns negative index [-1 to -N]. + // which is the possible place where key can be inserted. + // with one shifted position. As 0 is also a valid index. + // offset the one index shifted and get absolute value of it. + index = Math.abs(index + 1); + } + return index; + } + + private Range getBlockletIndexRange(long fromRowNumber, long toRowNumber) { + // find the matching blocklet index range by checking with from and to. + int upperBound = findBlockletIndex(rowCountInSplits, toRowNumber); + // lower bound cannot be more than upper bound, so work on sub list. + int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), fromRowNumber); + return new Range(lowerBound, upperBound); + } + + private Object[] getRows(long fromRowNumber, long toRowNumber) + throws IOException, InterruptedException { + int rowCount = 0; + Object[] rows = new Object[(int)(toRowNumber - fromRowNumber + 1)]; + // get the matching split index (blocklets) range for the input range. + Range blockletIndexRange = getBlockletIndexRange(fromRowNumber, toRowNumber); + for (int i = blockletIndexRange.getFrom(); i <= blockletIndexRange.getTo(); i++) { + String blockletUniqueId = String.valueOf(i); + BlockletRows blockletRows; + if (cache.get(blockletUniqueId) != null) { + blockletRows = (BlockletRows)cache.get(blockletUniqueId); + } else { + BlockletDetailInfo detailInfo = + ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo(); + int rowCountInBlocklet = detailInfo.getRowCount(); + Object[] rowsInBlocklet = new Object[rowCountInBlocklet]; + // read the rows from the blocklet + // TODO: read blocklets in multi-thread if there is a performance requirement. + readerBuilder.setInputSplit(allBlockletSplits.get(i)); + CarbonReader<Object> carbonReader = readerBuilder.build(); + int count = 0; + while (carbonReader.hasNext()) { + rowsInBlocklet[count++] = carbonReader.readNextRow(); + } + carbonReader.close(); + long fromRowId; + if (i == 0) { + fromRowId = 1; + } else { + // previous index will contain the sum of rows till previous blocklet. + fromRowId = rowCountInSplits.get(i - 1) + 1; + } + blockletRows = new BlockletRows(fromRowId, detailInfo.getBlockSize(), rowsInBlocklet); + // add entry to cache with no expiry time + // key: unique blocklet id + // value: BlockletRows + cache.put(String.valueOf(i), blockletRows, blockletRows.getMemorySize(), Integer.MAX_VALUE); + } + long fromBlockletRow = blockletRows.getRowIdStartIndex(); + long toBlockletRow = fromBlockletRow + blockletRows.getRowsCount(); + Object[] rowsInBlocklet = blockletRows.getRows(); + if (toRowNumber > toBlockletRow) { + if (fromRowNumber >= fromBlockletRow) { + // only fromRowNumber lies in this blocklet, + // read from fromRowNumber to end of the blocklet. + // -1 because row id starts form 0 + int start = (int) (fromRowNumber - blockletRows.getRowIdStartIndex()); + int end = blockletRows.getRowsCount(); + while (start < end) { Review comment: All 3 partial overlap cases we copy selectively. so start and end changes. but the while loop copy is same. How about refactor such that copy is moved out to common ? Just a suggestion if it doesn't make code difficult to understand. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3770: URL: https://github.com/apache/carbondata/pull/3770#discussion_r435944365 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.sdk.file.cache.BlockletRows; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * CarbonData SDK reader with pagination support + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class PaginationCarbonReader<T> extends CarbonReader<T> { + // Splits based the file present in the reader path when the reader is built. + private List<InputSplit> allBlockletSplits; + + // Rows till the current splits stored as list. + private List<Long> rowCountInSplits; + + // Reader builder used to create the pagination reader, used for building split level readers. + private CarbonReaderBuilder readerBuilder; + + private boolean isClosed; + + // to store the rows of each blocklet in memory based LRU cache. + // key: unique blocklet id + // value: BlockletRows + private CarbonLRUCache cache = + new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, + CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT); + + /** + * Call {@link #builder(String)} to construct an instance + */ + + PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) { + // Initialize super class with no readers. + // Based on the splits identified for pagination query, readers will be built for the query. + super(null); + this.allBlockletSplits = splits; + this.readerBuilder = readerBuilder; + // prepare the mapping. + rowCountInSplits = new ArrayList<>(splits.size()); + long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount(); + rowCountInSplits.add(sum); Review comment: yeah, it was initial code. modified. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |