GitHub user NamanRastogi opened a pull request:
https://github.com/apache/carbondata/pull/2850 Added concurrent reading through SDK Added another API for _CarbonReader.split_ to enable concurrent reading of carbondata files through SDK. ```java List<CarbonReader> multipleReaders = CarbonReader.split(maxSplits) ``` For detailed information on how to use this API for concurrent reading, please refer **ConcurrentSdkReaderTest.java** ## Performance Metrics: | | configured table block: 1 MB | configured table block size: 10 MB | configured table block: 100 MB | | --- | --- | --- | --- | | **# rows: 1e6**<br>**Store: 7.6 MB** | # files generated: 11<br><br>Sequential Read: 274 ms<br>Parallel Read: 123 ms | # files generated: 1<br><br>Sequential Read: 247 ms<br>Parallel Read: 248 ms | # files generated: 1<br><br>Sequential Read: 252 ms<br>Parallel Read: 254 ms | | **# rows: 1e7**<br>**Store: 78 MB** | # files generated: 104<br><br>Sequential Read: 2685 ms<br>Parallel Read: 1230 ms | # files generated: 9<br><br>Sequential Read: 2499 ms<br>Parallel Read: 1357 ms | # files generated: 1<br><br>Sequential Read: 2527 ms<br>Parallel Read: 2597 ms | | **# rows: 1e8**<br>**Store: 865 MB** | | # files generated: 95<br><br>Sequential Read: 27069 ms<br>Parallel Read: 16082 ms | # files generated: 15<br><br>Sequential Read: 25841 ms<br>Parallel Read: 13256 ms | - [ ] Any interfaces changed? - [x] Any backward compatibility impacted: No - [ ] Document update required? - [x] Testing done - New unit test case have been added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NamanRastogi/carbondata sdk_reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2850.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2850 ---- commit 55383136232203ca9de97a9304033c20cf7085f8 Author: Naman Rastogi <naman.rastogi.52@...> Date: 2018-10-18T12:54:23Z Added split for CarbonReader to enable multithreaded reading of carbondata files commit 79871f291262a05a1970b765232bf2f43f75e5d5 Author: Naman Rastogi <naman.rastogi.52@...> Date: 2018-10-22T14:07:06Z Added reader.close in CarbonSdkReaderTest commit cd44ee7efbe09c46cd4f6b84c431261b18a13d3d Author: Naman Rastogi <naman.rastogi.52@...> Date: 2018-10-22T14:07:06Z Added reader.close in CarbonSdkReaderTest commit 201d98ea157590c1d5f4decba89fcabae684c755 Author: Naman Rastogi <naman.rastogi.52@...> Date: 2018-10-24T06:53:44Z Merge branch 'sdk_reader' of https://github.com/NamanRastogi/carbondata into sdk_reader ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2850 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2850 @NamanRastogi I think we can further optimize this function. 1. We can enable the parallel reading and set the parallelism while creating a CarbonReader; 2. Inside CarbonReader, we handle the concurrent processing; 3. The interfaces for CarbonReader should be kept the same as before, there is no need to add more interfaces. By calling hasNext or next, user can get the next record and will not care about which RecordReader does this record belong to. The user interface looks like below: ``` CarbonReader reader = CarbonReader.builder(dataDir).parallelism(3).build(); while (reader.hasNext()) { reader.next(); } reader.close(); ``` To keep it simple, by default the parallelism can be 1 which means we will process each RecordReader one by one. Setting this parallelism to a higher value means that we will go process the RecordReaders in a thread pool with size 3. --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on the issue:
https://github.com/apache/carbondata/pull/2850 @xuchuanyin Since `CarbonReader` is iterator based, we can only read a line when user wants. So, even if we keep the parallelism internally before the build method, it will still read the file row by row (even though they are in different threads) and reading multiple rows will still be accessed sequentially. Please notice the test file **ConcurrentSdkReaderTest.java**, the reading is happening in the thread itself, and multiple threads are reading the files (line-by-line) concurrently. The actual reading is happening inside the `CarbonReader.readNext()` method, as long as that is sequential, the actual reading performance is not going to get better, so we have to have `CarbonReader.readNext()` inside different threads,. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2850 emm, but in your implementation, most of the work has to be done by the user (multi-thread handling). CarbonData itself only split the input data and return multiple readers. If this is the solution, why not just tell the user to generate multiple CarbonReaders by passing only part of the input dir each time they create the reader? Addition to my proposal, I think we can add a buffer for the records. When `CarbonReader.next` is called, we can retrieve the record from the buffer and fill the buffer asynchronously. When`CarbonReader.hasNext` is called, we can first detect this from the buffer, if it is empty, we will then detect this from the recordReader and fill the buffer asynchronously. --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on the issue:
https://github.com/apache/carbondata/pull/2850 @xuchuanyin Using this API the overhead of reading concurrently is on the consumer of CarbonReader. So yes, what you said what you said is right. But this API was made to consider this case only when a user wants to have more manual control over the concurrent reading. Consider the scenario when the user wants to read different files in different machines. If we take care of the concurrent reading internally (using a buffer as you suggested), the user cannot do that. Another thing that you mentioned was user providing the location of _carbondata_ files in the SDK reader itself, and that does not work in the current implementation of SDK. And for faster reading, there is one more pull request: [2816](https://github.com/apache/carbondata/pull/2816), it supports batch reading for a better reading performance. It is still not concurrent reading, but better than iterator based row-by-row reading. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r228706276 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; + +import junit.framework.TestCase; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOExceptionWithCause; +import org.junit.*; + +/** + * multi-thread Test suite for {@link CarbonReader} + */ +public class ConcurrentSdkReaderTest extends TestCase { + + private static final String dataDir = "./testReadFiles"; + + public void cleanTestData() { --- End diff -- you can add @Before or @After got it. especially there are many test case --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r228706363 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; + +import junit.framework.TestCase; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOExceptionWithCause; +import org.junit.*; + +/** + * multi-thread Test suite for {@link CarbonReader} + */ +public class ConcurrentSdkReaderTest extends TestCase { + + private static final String dataDir = "./testReadFiles"; + + public void cleanTestData() { + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + private void writeTestData(long numRows, int tableBlockSize) { + cleanTestData(); + + Field[] fields = new Field[2]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + + Map<String, String> tableProperties = new HashMap<>(); + tableProperties.put("table_blocksize", Integer.toString(tableBlockSize)); + + CarbonWriterBuilder builder = + CarbonWriter.builder().outputPath(dataDir).withTableProperties(tableProperties) + .withCsvInput(new Schema(fields)); + + try { + CarbonWriter writer = builder.build(); + + for (long i = 0; i < numRows; ++i) { + writer.write(new String[] { "robot_" + i, String.valueOf(i) }); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test public void testReadParallely() throws IOException, InterruptedException { + long numRows = 10000000; + int tableBlockSize = 10; + short numThreads = 4; + writeTestData(numRows, tableBlockSize); + long count; + + CarbonReader reader = CarbonReader.builder(dataDir).build(); + try { + count = 0; + long start = System.currentTimeMillis(); + while (reader.hasNext()) { + reader.readNextRow(); + count += 1; + } + long end = System.currentTimeMillis(); + System.out.println("[Sequential read] Time:" + (end - start)); + Assert.assertEquals(numRows, count); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + reader.close(); + } + + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + CarbonReader reader2 = CarbonReader.builder(dataDir).build(); + try { + List<CarbonReader> multipleReaders = reader2.split(numThreads); + List<Future> results = new ArrayList<>(); + count = 0; + long start = System.currentTimeMillis(); + for (CarbonReader reader_i : multipleReaders) { + results.add(executorService.submit(new ReadLogic(reader_i))); + } + for (Future result_i : results) { + count += (long) result_i.get(); + } + long end = System.currentTimeMillis(); + System.out.println("[Parallel read] Time:" + (end - start)); --- End diff -- Please add unit for it, such as ms --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/2850 @NamanRastogi Hi, customer required carbon provide the same order between using one thread to read and use multiple threads to read data. --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on the issue:
https://github.com/apache/carbondata/pull/2850 Yes, data coming from one file will always be in order. Please check the `split` method, it splits the list of CarbonRecordReader into multiple CarbonReader s. Suppose there are 10 carbondata files, and the user wants to get 3 splits, so he will get a list like this: --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on the issue:
https://github.com/apache/carbondata/pull/2850 Please check the split method, it splits the list of `CarbonRecordReader` into multiple `CarbonReader`s. It does not jumble the order of `CarbonRecordReader`, it still keeps them sequential. Suppose there are 10 *carbondata* files and thus 10 `CarbonRecordReader` in `CarbonReader.readers` object and the user wants to get 3 splits, so he will get a list like this: ```java CarbonReader reader = CarbonReader.builder(dataDir).build(); List<CarbonReader> multipleReaders = reader.split(3); ``` And the indices of `CarbonRecordReader`s in `multipleReaders` will be like: `multipleReaders.get(0).readers` points to {0,1,2,3} indices of *carbondata* files `multipleReaders.get(1).readers` points to {4,5,6} indices of *carbondata* files `multipleReaders.get(2).readers` points to {7,8,9} indices of *carbondata* files Now, if you read the rows like following code, the rows will still be in order. ```java for (CarbonReader reader_i : multipleReaders) { reader_i.readNextRow(); } ``` Earlier, you were getting data from 5th `CarbonRecordReader` only after you have exhausted the 4th. But now, you are getting it earlier, maybe even before 0th. So the user has to make sure he consumes it after he has used up the 4th file if order is important for him/her, otherwise he/she can use it earlier also if order is not important. So, for example to count the total no. of rows, user does not need the original order. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r229179428 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Return a new list of {@link CarbonReader} objects + * + * @param maxSplits + */ + public List<CarbonReader> split(int maxSplits) throws IOException { --- End diff -- Need to add new interfaces exposed in sdk-guide.md. you can add this API info there. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r229179713 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Return a new list of {@link CarbonReader} objects + * + * @param maxSplits + */ + public List<CarbonReader> split(int maxSplits) throws IOException { + validateReader(); + if (maxSplits < 1) { + throw new RuntimeException( + this.getClass().getSimpleName() + ".split: maxSplits must be positive"); + } + + List<CarbonReader> carbonReaders = new ArrayList<>(); + + // If maxSplits < readers.size + // Split the reader into maxSplits splits with each + // element contains >= 1 CarbonRecordReader objects + if (maxSplits < this.readers.size()) { + for (int i = 0; i < maxSplits; ++i) { + carbonReaders.add(new CarbonReader<>(this.readers + .subList((int) Math.ceil((float) (i * this.readers.size()) / maxSplits), + (int) Math.ceil((float) ((i + 1) * this.readers.size()) / maxSplits)))); + } + } + // If maxSplits >= readers.size + // Split the reader into reader.size splits with each --- End diff -- keep comments inside else block for easy reading of the code --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r229179978 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Return a new list of {@link CarbonReader} objects + * --- End diff -- Add a clear description, mention what happens if splits greater than the number of files and what happens if splits are lesser than the number of files --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r229183179 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Return a new list of {@link CarbonReader} objects + * + * @param maxSplits + */ + public List<CarbonReader> split(int maxSplits) throws IOException { + validateReader(); + if (maxSplits < 1) { + throw new RuntimeException( + this.getClass().getSimpleName() + ".split: maxSplits must be positive"); + } + + List<CarbonReader> carbonReaders = new ArrayList<>(); + + // If maxSplits < readers.size + // Split the reader into maxSplits splits with each + // element contains >= 1 CarbonRecordReader objects + if (maxSplits < this.readers.size()) { + for (int i = 0; i < maxSplits; ++i) { + carbonReaders.add(new CarbonReader<>(this.readers + .subList((int) Math.ceil((float) (i * this.readers.size()) / maxSplits), --- End diff -- this is constant, do this outside loop and use it each time --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r229183542 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; + +import junit.framework.TestCase; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOExceptionWithCause; +import org.junit.*; + +/** + * multi-thread Test suite for {@link CarbonReader} + */ +public class ConcurrentSdkReaderTest { + + private static final String dataDir = "./testReadFiles"; + + @Before + @After + public void cleanTestData() { + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + private void writeTestData(long numRows, int tableBlockSize) { + Field[] fields = new Field[2]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + + Map<String, String> tableProperties = new HashMap<>(); + tableProperties.put("table_blocksize", Integer.toString(tableBlockSize)); + + CarbonWriterBuilder builder = + CarbonWriter.builder().outputPath(dataDir).withTableProperties(tableProperties) + .withCsvInput(new Schema(fields)); + + try { + CarbonWriter writer = builder.build(); + + for (long i = 0; i < numRows; ++i) { + writer.write(new String[] { "robot_" + i, String.valueOf(i) }); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test public void testReadParallely() throws IOException, InterruptedException { + long numRows = 10000000; --- End diff -- We must not add huge record test case in UT. PR builder time for all the PR will affect by this. Locally test with huge data but update UT with fewer rows. say 10 rows. --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r229299196 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Return a new list of {@link CarbonReader} objects + * --- End diff -- Done! --- |
In reply to this post by qiuchenjian-2
Github user shardul-cr7 commented on the issue:
https://github.com/apache/carbondata/pull/2850 retest this please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r230244674 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +115,57 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Breaks the list of CarbonRecordReader in CarbonReader into multiple + * CarbonReader objects, each iterating through some 'carbondata' files + * and return that list of CarbonReader objects + * + * If the no. of files is greater than maxSplits, then break the + * CarbonReader into maxSplits splits, with each split iterating + * through >= 1 file. + * + * If the no. of files is less than maxSplits, then return list of + * CarbonReader with size as the no. of files, with each CarbonReader + * iterating through exactly one file + * + * @param maxSplits: Int + * @return list of {@link CarbonReader} objects + */ + public List<CarbonReader> split(int maxSplits) throws IOException { --- End diff -- I feel this method should be moved to builder. Add another method in builder `build(int splits)` and return List of readers. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2850#discussion_r230246531 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java --- @@ -114,6 +115,57 @@ public static CarbonReaderBuilder builder(String tablePath) { return builder(tablePath, tableName); } + /** + * Breaks the list of CarbonRecordReader in CarbonReader into multiple + * CarbonReader objects, each iterating through some 'carbondata' files + * and return that list of CarbonReader objects + * + * If the no. of files is greater than maxSplits, then break the + * CarbonReader into maxSplits splits, with each split iterating + * through >= 1 file. + * + * If the no. of files is less than maxSplits, then return list of + * CarbonReader with size as the no. of files, with each CarbonReader + * iterating through exactly one file + * + * @param maxSplits: Int + * @return list of {@link CarbonReader} objects + */ + public List<CarbonReader> split(int maxSplits) throws IOException { + validateReader(); + if (maxSplits < 1) { + throw new RuntimeException( + this.getClass().getSimpleName() + ".split: maxSplits must be positive"); + } + + List<CarbonReader> carbonReaders = new ArrayList<>(); + + if (maxSplits < this.readers.size()) { --- End diff -- Add UT only to this method to make sure splits happen correctly with multiple splits combinations and readers size, --- |
Free forum by Nabble | Edit this page |