Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2672#discussion_r214509623 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -460,27 +461,32 @@ public CarbonLoadModel getLoadModel() { private CarbonOutputIteratorWrapper[] iterators; - private int counter; + // keep counts of number of writes called + // and it is used to load balance each write call to one iterator. + private AtomicLong counter; CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, ExecutorService executorService) { super(null, dataLoadExecutor, loadModel, future, executorService); this.iterators = iterators; + counter = new AtomicLong(0); } - @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects) + @Override public void write(NullWritable aVoid, ObjectArrayWritable objects) throws InterruptedException { - iterators[counter].write(objects.get()); - if (++counter == iterators.length) { - //round robin reset - counter = 0; + int iteratorLength = iterators.length; + int iteratorNum = (int) (counter.incrementAndGet() % iteratorLength); + synchronized (iterators[iteratorNum]) { --- End diff -- giving multiple writer for user will complicate usability as user need to take care of closing each writer. so selected a simplified design. --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2672#discussion_r214545548 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -460,27 +461,32 @@ public CarbonLoadModel getLoadModel() { private CarbonOutputIteratorWrapper[] iterators; - private int counter; + // keep counts of number of writes called + // and it is used to load balance each write call to one iterator. + private AtomicLong counter; CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, ExecutorService executorService) { super(null, dataLoadExecutor, loadModel, future, executorService); this.iterators = iterators; + counter = new AtomicLong(0); } - @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects) + @Override public void write(NullWritable aVoid, ObjectArrayWritable objects) throws InterruptedException { - iterators[counter].write(objects.get()); - if (++counter == iterators.length) { - //round robin reset - counter = 0; + int iteratorLength = iterators.length; --- End diff -- Directly use iterators.length no need to assign to any variable --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/2672 @ajantha-bhat please check my comment --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2672#discussion_r214546012 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -460,27 +461,32 @@ public CarbonLoadModel getLoadModel() { private CarbonOutputIteratorWrapper[] iterators; - private int counter; + // keep counts of number of writes called + // and it is used to load balance each write call to one iterator. + private AtomicLong counter; CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, ExecutorService executorService) { super(null, dataLoadExecutor, loadModel, future, executorService); this.iterators = iterators; + counter = new AtomicLong(0); } - @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects) + @Override public void write(NullWritable aVoid, ObjectArrayWritable objects) throws InterruptedException { - iterators[counter].write(objects.get()); - if (++counter == iterators.length) { - //round robin reset - counter = 0; + int iteratorLength = iterators.length; --- End diff -- done. fixed it --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2672 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8233/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2672 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/162/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/2672 LGTM --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/2672 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2672 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8330/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2672 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/260/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2672 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/12/ --- |
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |