[GitHub] [carbondata] ajantha-bhat opened a new pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

classic Classic list List threaded Threaded
84 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat opened a new pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox

ajantha-bhat opened a new pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770


    ### Why is this PR needed?
    Please refer the design document attached in the JIRA.
    Carbondata SDK now currently doesn't support pagination.
   
    ### What changes were proposed in this PR?
   a) Support pagination from java SDK with LRU cache support.
   b) Support pagination in python SDK by calling JAVA SDK.
       
    ### Does this PR introduce any user interface change?
    - No [Added new interfaces]
   
    ### Is any new testcase added?
    - Yes
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox

ajantha-bhat commented on pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#issuecomment-631874964


   @jackylk , @xubo245 : please check


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#issuecomment-631920232


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1329/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#issuecomment-631921958


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3049/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#issuecomment-633897508






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432580839



##########
File path: docs/sdk-guide.md
##########
@@ -766,6 +771,39 @@ public VectorSchemaRoot getArrowVectors() throws IOException;
 public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes, BufferAllocator bufferAllocator) throws IOException;
 ```
 
+### Class org.apache.carbondata.sdk.file.PaginationCarbonReader
+```
+/**
+* Pagination query with from and to range.
+*
+* @param from must be greater than 0 and <= to
+* @param to must be >= from and not outside the total rows
+* @return array of rows between from and to (inclusive)
+* @throws Exception
+*/
+public Object[] read(long from, long to) throws IOException, InterruptedException;
+```
+
+```
+/**
+* Get total rows in the folder.

Review comment:
       Suggestion: Get total rows in the folder or a list of CarbonData files
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432601619



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {

Review comment:
       Why add 2 for summationArray2?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432602296



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);

Review comment:
       please merge line 145 and 146




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432603190



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};

Review comment:
       suggestion: create new class and return a object of this class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432603190



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};

Review comment:
       suggestion: create new class and return a object instance of this class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432604711



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];

Review comment:
       So the rows result number should not more than Integer.MAX_VALUE:  to - from + 1 <= Integer.MAX_VALUE?
   
   Suggestion: limit the size of to - from + 1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432604711



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];

Review comment:
       So the rows result number should not more than Integer.MAX_VALUE:  to - from + 1 <= Integer.MAX_VALUE?
   
   Suggestion: limit the scope: to - from + 1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432605600



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];
+    // get the matching split index (blocklets) range for the input range.
+    int[] blockletIndexRange = getBlockletIndexRange(from, to);
+    for (int i = blockletIndexRange[0]; i <= blockletIndexRange[1]; i++) {

Review comment:
       suggestion: blockletIndexRange[0] => blockletIndexRange.getFrom()  
   
   blockletIndexRange[1] => blockletIndexRange.getTo()  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432606525



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];
+    // get the matching split index (blocklets) range for the input range.
+    int[] blockletIndexRange = getBlockletIndexRange(from, to);
+    for (int i = blockletIndexRange[0]; i <= blockletIndexRange[1]; i++) {
+      String blockletUniqueId = String.valueOf(i);
+      BlockletRows blockletRows;
+      if (cache.get(blockletUniqueId) != null) {
+        blockletRows = (BlockletRows)cache.get(blockletUniqueId);
+      } else {
+        BlockletDetailInfo detailInfo =
+            ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo();
+        int rowCountInBlocklet = detailInfo.getRowCount();
+        Object[] rowsInBlocklet = new Object[rowCountInBlocklet];
+        // read the rows from the blocklet
+        // TODO: read blocklets in multi-thread if there is a performance requirement.

Review comment:
       How about the performance for pagination?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432606525



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];
+    // get the matching split index (blocklets) range for the input range.
+    int[] blockletIndexRange = getBlockletIndexRange(from, to);
+    for (int i = blockletIndexRange[0]; i <= blockletIndexRange[1]; i++) {
+      String blockletUniqueId = String.valueOf(i);
+      BlockletRows blockletRows;
+      if (cache.get(blockletUniqueId) != null) {
+        blockletRows = (BlockletRows)cache.get(blockletUniqueId);
+      } else {
+        BlockletDetailInfo detailInfo =
+            ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo();
+        int rowCountInBlocklet = detailInfo.getRowCount();
+        Object[] rowsInBlocklet = new Object[rowCountInBlocklet];
+        // read the rows from the blocklet
+        // TODO: read blocklets in multi-thread if there is a performance requirement.

Review comment:
       How about the performance for pagination now ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432611327



##########
File path: python/pycarbon/sdk/PaginationCarbonReader.py
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class PaginationCarbonReader(object):
+  def __init__(self):
+    from jnius import autoclass
+    self.readerClass = autoclass('org.apache.carbondata.sdk.file.PaginationCarbonReader')
+
+  def builder(self, path, table_name):
+    self.PaginationCarbonReaderBuilder = self.readerClass.builder(path, table_name)
+    return self
+
+  def projection(self, projection_list):
+    self.PaginationCarbonReaderBuilder.projection(projection_list)
+    return self
+
+  def withHadoopConf(self, key, value):

Review comment:
       Please support temporary ak, sk  and security token in python.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432611327



##########
File path: python/pycarbon/sdk/PaginationCarbonReader.py
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class PaginationCarbonReader(object):
+  def __init__(self):
+    from jnius import autoclass
+    self.readerClass = autoclass('org.apache.carbondata.sdk.file.PaginationCarbonReader')
+
+  def builder(self, path, table_name):
+    self.PaginationCarbonReaderBuilder = self.readerClass.builder(path, table_name)
+    return self
+
+  def projection(self, projection_list):
+    self.PaginationCarbonReaderBuilder.projection(projection_list)
+    return self
+
+  def withHadoopConf(self, key, value):

Review comment:
       Please support temporary ak, sk  and security token in python too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] xubo245 commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

xubo245 commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r432611960



##########
File path: python/pycarbon/tests/sdk/test_read_write_carbon.py
##########
@@ -25,7 +26,8 @@
 import os
 import jnius_config
 
-jnius_config.set_classpath("../../../sdk/sdk/target/carbondata-sdk.jar")
+jnius_config.set_classpath("../../../../sdk/sdk/target/carbondata-sdk.jar")
+# jnius_config.add_options('-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6666')

Review comment:
       Please remove it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r433643240



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];

Review comment:
       yes, pagination query is ideally used for getting few rows (per page).
   
   Total rows can be more than Integer.MAX_VALUE (hence using long for from and to), but per query should not be more than Integer.MAX_VALUE.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3770: [CARBONDATA-3829] Support pagination in SDK reader

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3770:
URL: https://github.com/apache/carbondata/pull/3770#discussion_r433644058



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.file.cache.BlockletRows;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * CarbonData SDK reader with pagination support
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class PaginationCarbonReader<T> extends CarbonReader<T> {
+  // Splits based the file present in the reader path when the reader is built.
+  private List<InputSplit> allBlockletSplits;
+
+  // Rows till the current splits stored as list.
+  private List<Long> rowCountInSplits;
+
+  // Reader builder used to create the pagination reader, used for building split level readers.
+  private CarbonReaderBuilder readerBuilder;
+
+  private boolean isClosed;
+
+  // to store the rows of each blocklet in memory based LRU cache.
+  // key: unique blocklet id
+  // value: BlockletRows
+  private CarbonLRUCache cache =
+      new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB,
+          CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB_DEFAULT);
+
+  /**
+   * Call {@link #builder(String)} to construct an instance
+   */
+
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder readerBuilder) {
+    // Initialize super class with no readers.
+    // Based on the splits identified for pagination query, readers will be built for the query.
+    super(null);
+    this.allBlockletSplits = splits;
+    this.readerBuilder = readerBuilder;
+    // prepare the mapping.
+    rowCountInSplits = new ArrayList<>(splits.size());
+    long sum = ((CarbonInputSplit) splits.get(0)).getDetailInfo().getRowCount();
+    rowCountInSplits.add(sum);
+    for (int i = 1; i < splits.size(); i++) {
+      // prepare a summation array of row counts in each blocklet,
+      // this is used for pruning with pagination vales.
+      // At current index, it contains sum of rows of all the blocklet from previous + current.
+      sum += ((CarbonInputSplit) splits.get(i)).getDetailInfo().getRowCount();
+      rowCountInSplits.add(sum);
+    }
+  }
+
+  /**
+   * Pagination query with from and to range.
+   *
+   * @param from must be greater than 0 and <= to
+   * @param to must be >= from and not outside the total rows
+   * @return array of rows between from and to (inclusive)
+   * @throws Exception
+   */
+  public Object[] read(long from, long to) throws IOException, InterruptedException {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    if (from < 1) {
+      throw new IllegalArgumentException("from row id:" + from + " is less than 1");
+    }
+    if (from > to) {
+      throw new IllegalArgumentException(
+          "from row id:" + from + " is greater than to row id:" + to);
+    }
+    if (to > getTotalRows()) {
+      throw new IllegalArgumentException(
+          "to row id:" + to + " is greater than total rows:" + getTotalRows());
+    }
+    return getRows(from, to);
+  }
+
+  /**
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public long getTotalRows() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return rowCountInSplits.get(rowCountInSplits.size() - 1);
+  }
+
+  /**
+   * This interface is for python to call java.
+   * Because python cannot understand java Long object. so send string object.
+   *
+   * Get total rows in the folder.
+   * It is based on the snapshot of files taken while building the reader.
+   *
+   *
+   * @return total rows from all the files in the reader.
+   */
+  public String getTotalRowsAsString() {
+    if (isClosed) {
+      throw new RuntimeException("Pagination Reader is closed. please build again");
+    }
+    return (rowCountInSplits.get(rowCountInSplits.size() - 1)).toString();
+  }
+
+  private static int findBlockletIndex(List<Long> summationArray2, Long key) {
+    // summation array is in sorted order, so can use the binary search.
+    int index = Collections.binarySearch(summationArray2, key);
+    if (index < 0) {
+      // when key not found, binary search returns negative index [-1 to -N].
+      // which is the possible place where key can be inserted.
+      // with one shifted position. As 0 is also a valid index.
+      index = index + 1; // offset the one index shifted.
+      index = Math.abs(index);
+    }
+    return index;
+  }
+
+  private int[] getBlockletIndexRange(long from, long to) {
+    // find the matching blocklet index range by checking with from and to.
+    int upperBound = findBlockletIndex(rowCountInSplits, to);
+    // lower bound cannot be more than upper bound, so work on sub list.
+    int lowerBound = findBlockletIndex(rowCountInSplits.subList(0, upperBound), from);
+    return new int[] {lowerBound, upperBound};
+  }
+
+  private Object[] getRows(long from, long to)
+      throws IOException, InterruptedException {
+    int rowCount = 0;
+    Object[] rows = new Object[(int)(to - from + 1)];
+    // get the matching split index (blocklets) range for the input range.
+    int[] blockletIndexRange = getBlockletIndexRange(from, to);
+    for (int i = blockletIndexRange[0]; i <= blockletIndexRange[1]; i++) {
+      String blockletUniqueId = String.valueOf(i);
+      BlockletRows blockletRows;
+      if (cache.get(blockletUniqueId) != null) {
+        blockletRows = (BlockletRows)cache.get(blockletUniqueId);
+      } else {
+        BlockletDetailInfo detailInfo =
+            ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo();
+        int rowCountInBlocklet = detailInfo.getRowCount();
+        Object[] rowsInBlocklet = new Object[rowCountInBlocklet];
+        // read the rows from the blocklet
+        // TODO: read blocklets in multi-thread if there is a performance requirement.

Review comment:
       Tested with local file system. With cache reading few rows (out of million rows) after cache is loaded is very fast (within 1 second)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12345