GitHub user kumarvishal09 opened a pull request:
https://github.com/apache/carbondata/pull/2841 [WIP] Unsafe fallback to heap and unsafe query fix Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kumarvishal09/incubator-carbondata master_17 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2841.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2841 ---- commit d8a89afb317cd8acec061be03ee9a81232479b75 Author: kumarvishal09 <kumarvishal1802@...> Date: 2018-10-18T07:31:28Z fixed unsafe problem ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/920/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1129/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9190/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1141/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/935/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9199/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227246688 --- Diff: processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java --- @@ -292,7 +300,7 @@ private void addComplexColumn(int index, int rowId, byte[] complexColumns) { complexDimensionPages[index] = new ComplexColumnPage(complexColumnInfoList); try { complexDimensionPages[index].initialize( - model.getColumnLocalDictGenMap(), pageSize, columnCompressor); + model.getColumnLocalDictGenMap(), pageSize, columnCompressor, unsafe); --- End diff -- replace `unsafe --> isUnsafe` --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227246446 --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java --- @@ -124,8 +116,21 @@ private synchronized MemoryBlock allocateMemory(MemoryType memoryType, String ta + "Bytes"); } return allocate; + } else { + MemoryBlock allocate = MemoryAllocator.HEAP.allocate(memoryRequested); + Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); + if (null == listOfMemoryBlock) { + listOfMemoryBlock = new HashSet<>(); + taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); + } + listOfMemoryBlock.add(allocate); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Memory block (" + allocate + ") is created with size " + allocate.size() + + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) + + "Bytes"); --- End diff -- for On-Heap case totalMemory can be 0 also if not configured...so bytes left cannot be printed in the debug logs. For printing memory used add the allocated size to memory used for on-heap case --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227239817 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java --- @@ -73,8 +73,9 @@ public ComplexColumnPage(List<ComplexColumnInfo> complexColumnInfoList) { * if memory is not sufficient */ public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap, int pageSize, - String columnCompressor) throws MemoryException { + String columnCompressor, boolean isUnsafe) throws MemoryException { DataType dataType; + --- End diff -- Remove extra line --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227241237 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java --- @@ -125,46 +126,48 @@ public void setDoublePage(double[] doubleData) { * Create a new column page for decimal page */ static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes, - String compressorName) throws MemoryException { + String compressorName, boolean isUnsafe) throws MemoryException { DecimalConverterFactory.DecimalConverter decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale()); int size = decimalConverter.getSize(); if (size < 0) { return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()), - CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName); + CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName, isUnsafe); } else { // Here the size is always fixed. - return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName); + return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName, isUnsafe); } } /** * Create a new column page based on the LV (Length Value) encoded bytes */ static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes, - int lvLength, String compressorName) throws MemoryException { + int lvLength, String compressorName, boolean isUnsafe) throws MemoryException { return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, - lvLength, compressorName); + lvLength, compressorName, isUnsafe); } /** * Create a new column page based on the LV (Length Value) encoded bytes */ static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, int lvLength, String compressorName) throws MemoryException { + byte[] lvEncodedBytes, int lvLength, String compressorName, boolean isUnsafe) + throws MemoryException { return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, - lvLength, compressorName); + lvLength, compressorName, isUnsafe); } private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException { + byte[] lvEncodedBytes, int size, String compressorName, boolean unsafe) + throws MemoryException { TableSpec.ColumnSpec spec = TableSpec.ColumnSpec .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE); ColumnPage rowOffset = ColumnPage.newPage( new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName), - CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, unsafe); --- End diff -- change the flag name `unsafe --> isUnsafe`, same as done in the above the changes. Keep the flag name consistent everywhere --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227238938 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java --- @@ -168,13 +169,16 @@ public CarbonDictionary getLocalDictionary() { */ public static CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk, Compressor compressor) throws IOException, MemoryException { + boolean isUnsafe = Boolean.parseBoolean(CarbonProperties.getInstance() --- End diff -- similar to other places below this is also a query execution flow...so here also the parameter `CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION` need to be considered for deciding on unsafe or only one parameter is sufficient? --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227235905 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java --- @@ -168,13 +169,16 @@ public CarbonDictionary getLocalDictionary() { */ public static CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk, Compressor compressor) throws IOException, MemoryException { + boolean isUnsafe = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, + CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE)); --- End diff -- Move this flag inside if null check loop --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9219/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1164/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/955/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2841#discussion_r227339904 --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java --- @@ -124,8 +116,21 @@ private synchronized MemoryBlock allocateMemory(MemoryType memoryType, String ta + "Bytes"); } return allocate; + } else { + MemoryBlock allocate = MemoryAllocator.HEAP.allocate(memoryRequested); --- End diff -- Its not actually fall back once offheap memory is available it will switch to offheap ...depends on offheap memory , i will add debug log --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/967/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9237/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2841 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1183/ --- |
Free forum by Nabble | Edit this page |