GitHub user manishgupta88 opened a pull request:
https://github.com/apache/carbondata/pull/2286 [CARBONDATA-2417] [SDK Writer] SDK writer goes to infinite wait when consumer thread goes dead Problem: SDK Writer going into infinte loop in case of multi-threaded scenario Analysis: In multi-threaded scenarios where multiple instances of writer thread are trying to add a row to the RowBatch, addition to given batch size cannot be ensured as addition process is not synchronized and it can lead to ArrayIndexOutOfBound Exception or data loss/mismatch issues If multiple writer threads are adding the data to RowBatch and immediately after launching all the threads closeWriter is called, in that case we don't know when all the data is finished writing by all the threads but we are returning immediately from close writer after setting the close flag to true. This does not ensure complete processing of data. Solution: Make the row addition logic synchronized and modify the code in closeWriter to ensure data completeness. - [ ] Any interfaces changed? No - [ ] Any backward compatibility impacted? No - [ ] Document update required? No - [ ] Testing done Manually verified - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/manishgupta88/carbondata sdk_writer_infinte_loop_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2286.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2286 ---- commit dc48cb81745d752e54268b169419fbcef1085369 Author: manishgupta88 <tomanishgupta18@...> Date: 2018-05-09T09:38:05Z Problem: SDK Writer going into infinte loop in case of multi-threaded scenario Analysis: In multi-threaded scenarios where multiple instances of writer thread are trying to add a row to the RowBatch, addition to given batch size cannot be ensured as addition process is not synchronized and it can lead to ArrayIndexOutOfBound Exception or data loss/mismatch issues If multiple writer threads are adding the data to RowBatch and immediately after launching all the threads closeWriter is called, in that case we don't know when all the data is finished writing by all the threads but we are returning immediately from close writer after setting the close flag to true. This does not ensure complete processing of data. Solution: Make the row addition logic synchronized and modify the code in closeWriter to ensure data completeness. ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2286 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4605/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2286 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5764/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2286 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4825/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2286#discussion_r187105735 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -51,10 +57,16 @@ public void write(Object[] row) throws InterruptedException { // already might be closed forcefully return; } - if (!loadBatch.addRow(row)) { - loadBatch.readyRead(); - queue.put(loadBatch); - loadBatch = new RowBatch(batchSize); + // synchronization block is added for multi threaded scenarios where multiple instances of + // writer thread are trying to add a row to the RowBatch. In those cases addition to given + // batch size cannot be ensured and it can lead to ArrayIndexOutOfBound Exception or data + // loss/mismatch issues + synchronized (lock) { --- End diff -- Adding synchronisation for each row will hit data loading performance using output format. Current writer interface is for single thread. If you are adding synchronisation to handle multi threaded scenario but consumer is only one, so it will not help. We need to add one more api where user can pass number of threads and based on thread we can create multiple Producer(CarbonOutputIteratorWrapper.java) and pass the same consumer to all. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2286#discussion_r187242849 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -51,10 +57,16 @@ public void write(Object[] row) throws InterruptedException { // already might be closed forcefully return; } - if (!loadBatch.addRow(row)) { - loadBatch.readyRead(); - queue.put(loadBatch); - loadBatch = new RowBatch(batchSize); + // synchronization block is added for multi threaded scenarios where multiple instances of + // writer thread are trying to add a row to the RowBatch. In those cases addition to given + // batch size cannot be ensured and it can lead to ArrayIndexOutOfBound Exception or data + // loss/mismatch issues + synchronized (lock) { --- End diff -- Even though current writer interface is for single thread we cant block its usage for multi-threaded scenario i.e write method is called by multiple threads using the same writer instance. 1. If there is a single writer instance and only one thread calls the write interface then there is no impact on the performance as the call will come one by one from the same thread and lock will be acquired only by that thread. 2. If there is single writer and multiple threads are calling the write interface using the same writer instance then locking is required because add a row to RowBatch is not synchronized and can lead to ArrayIndexoutOfBound or data loss/mismatch issues. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2286#discussion_r187245016 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -51,10 +57,16 @@ public void write(Object[] row) throws InterruptedException { // already might be closed forcefully return; } - if (!loadBatch.addRow(row)) { - loadBatch.readyRead(); - queue.put(loadBatch); - loadBatch = new RowBatch(batchSize); + // synchronization block is added for multi threaded scenarios where multiple instances of + // writer thread are trying to add a row to the RowBatch. In those cases addition to given + // batch size cannot be ensured and it can lead to ArrayIndexOutOfBound Exception or data + // loss/mismatch issues + synchronized (lock) { --- End diff -- This is the developer SDK we are providing not end-user interface. The developer needs to follow as per the java doc provided by sdk. Even many Java APIs are not thread safe, and it is mentioned in there doc as well, so it is same as that. For loading multi-threaded we can add another SDK API to load faster. In this PR you are just allowing multi-threaded while writing but it is useless if the internal dataloader does not support multi threading. So we should provide another API to allow multithread or parallel load scenarios. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2286#discussion_r187247243 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -51,10 +57,16 @@ public void write(Object[] row) throws InterruptedException { // already might be closed forcefully return; } - if (!loadBatch.addRow(row)) { - loadBatch.readyRead(); - queue.put(loadBatch); - loadBatch = new RowBatch(batchSize); + // synchronization block is added for multi threaded scenarios where multiple instances of + // writer thread are trying to add a row to the RowBatch. In those cases addition to given + // batch size cannot be ensured and it can lead to ArrayIndexOutOfBound Exception or data + // loss/mismatch issues + synchronized (lock) { --- End diff -- @kumarvishal09 @ravipesala ....I got your point. In that case this PR is not required --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2286 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5789/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2286 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4634/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2286 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4842/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2286#discussion_r187264257 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -98,14 +99,31 @@ public void closeWriter(boolean isForceClose) { close = true; return; } - loadBatch.readyRead(); - if (loadBatch.size > 0) { - queue.put(loadBatch); + // flag to check whether all the RowBatches have been consumed + boolean processingFinished = false; --- End diff -- As this class will be created for each thread as mentioned in earlier comment, i think below changes is also not required. In case of multi threaded each writer will have its own CarbonOutputIteratorWrapper object. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2286#discussion_r187274823 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -98,14 +99,31 @@ public void closeWriter(boolean isForceClose) { close = true; return; } - loadBatch.readyRead(); - if (loadBatch.size > 0) { - queue.put(loadBatch); + // flag to check whether all the RowBatches have been consumed + boolean processingFinished = false; --- End diff -- As discussed with you this change is required because put is a blocking call and if consumer is slow or any exception occurs then it is a possibility that the thread will still be waiting at put method for the last RowBatch --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2286 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5805/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2286 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4650/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2286 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4857/ --- |
Free forum by Nabble | Edit this page |