akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r378874531 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ########## @@ -80,29 +80,25 @@ public void initialize(SortParameters sortParameters) { public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); - UnsafeSortDataRows sortDataRow = - new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB); + UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[iterators.length]; Review comment: i think, this number of threads depends upon iterators right, so based on iterators it will be done parallel right. or are you planning to define separate property for it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379214468 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ########## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: just modify one parameter from origin code ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379215166 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ########## @@ -80,29 +80,25 @@ public void initialize(SortParameters sortParameters) { public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); - UnsafeSortDataRows sortDataRow = - new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB); + UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[iterators.length]; Review comment: please check original code Line 91. As I said above, we can limit the pool size. We can use existing parameter from `sortParameters.getNumberOfCores()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379215166 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ########## @@ -80,29 +80,25 @@ public void initialize(SortParameters sortParameters) { public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); - UnsafeSortDataRows sortDataRow = - new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB); + UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[iterators.length]; Review comment: please check original code Line 91. As I said above, we can limit the pool size. Existing parameter `sortParameters.getNumberOfCores()` can be used ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379215697 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** Review comment: When the method name is meaningful, comment is not necessary. Keep clean code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379216009 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { Review comment: only remove synchronized from origin ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379216281 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("************ Writing to temp file ********** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); + entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { + try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(recordHolderArray, + new NewRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); + } else { + Arrays.sort(recordHolderArray, + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() Review comment: moved from `DataSorterAndWriter` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379216889 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("************ Writing to temp file ********** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); + entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { + try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(recordHolderArray, + new NewRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); + } else { + Arrays.sort(recordHolderArray, + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches 20 then Review comment: moved from `DataSorterAndWriter`. By the way, Recommand to go through code instead of only peek changes in git. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379217256 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ########## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); - } - } - - public void startMergingIfPossible() { - File[] fileList; - if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { - fileList = procFiles.toArray(new File[procFiles.size()]); - this.procFiles = new ArrayList<File>(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { + File[] fileList = procFiles.toArray(new File[procFiles.size()]); + this.procFiles = new ArrayList<>(); + startIntermediateMerging(fileList); Review comment: `procFiles.toArray()` returns `Object[]`, type missing ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379217256 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ########## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); - } - } - - public void startMergingIfPossible() { - File[] fileList; - if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { - fileList = procFiles.toArray(new File[procFiles.size()]); - this.procFiles = new ArrayList<File>(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { + File[] fileList = procFiles.toArray(new File[procFiles.size()]); + this.procFiles = new ArrayList<>(); + startIntermediateMerging(fileList); Review comment: `procFiles.toArray()` returns `Object[]`, type mismatch ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379218383 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ########## @@ -101,6 +87,10 @@ private void startIntermediateMerging(File[] intermediateFiles) { + '_' + parameters.getRangeId() + '_' + System.nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sumitting request for intermediate merging no of files: " Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250672 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { Review comment: yes, but better to rename, it will be meaningful ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250819 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** Review comment: i suggest to to improve the comments like what exactly the method does and how it is done for better understanding of any new developer. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250891 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ########## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); - } - } - - public void startMergingIfPossible() { - File[] fileList; - if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { - fileList = procFiles.toArray(new File[procFiles.size()]); - this.procFiles = new ArrayList<File>(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { + File[] fileList = procFiles.toArray(new File[procFiles.size()]); + this.procFiles = new ArrayList<>(); + startIntermediateMerging(fileList); Review comment: yes, that's why i gave comment with type cast also, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250976 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("************ Writing to temp file ********** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); + entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { + try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(recordHolderArray, + new NewRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); + } else { + Arrays.sort(recordHolderArray, + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() Review comment: yes, i saw original code, but since you are doing refactoring in this PR, better change now ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251111 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("************ Writing to temp file ********** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); + entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { + try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(recordHolderArray, + new NewRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); + } else { + Arrays.sort(recordHolderArray, + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches 20 then Review comment: yes, i already compared old code your code, but better to improve code when we do refactoring, my suggestion actually ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251181 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ########## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: better to refactor when we have chance. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251111 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("************ Writing to temp file ********** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); + entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { + try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(recordHolderArray, + new NewRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); + } else { + Arrays.sort(recordHolderArray, + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches 20 then Review comment: yes, i already compared old code your code, but better to improve code when we do refactoring, my suggestion actually ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379252776 ########## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ########## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("************ Writing to temp file ********** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { - LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); - throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file - synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("************ Writing to temp file ********** "); - } - intermediateFileMerger.startMergingIfPossible(); - Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; - if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); - } - try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); - } - // create the new holder Array - this.recordHolderList = new Object[this.sortBufferSize][]; - this.entryCount = 0; - size = size - sizeLeft; - if (size == 0) { - return; - } + int sizeLeft = 0; + if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("************ Writing to temp file ********** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); + entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { + try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(recordHolderArray, + new NewRowComparator(parameters.getNoDictionarySortColumn(), + parameters.getNoDictDataType())); + } else { + Arrays.sort(recordHolderArray, + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches 20 then Review comment: ```suggestion // add sort temp filename to arrayList. When the list size reaches 20 then ``` do like above, and is not required ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251181 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ########## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: better to refactor when we have chance, else it will be skipped actually ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |