GitHub user xuchuanyin opened a pull request:
https://github.com/apache/carbondata/pull/2443 [CARBONDATA-2685][DataMap] Parallize datamap rebuild processing for segments Currently in carbondata, while rebuilding datamap, one spark job will be started for each segment and all the jobs are executed serailly. If we have many historical segments, the rebuild will takes a lot of time. Here we optimize the procedure for datamap rebuild and start one start for each segments, all the tasks can be done in parallel in one spark job. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuchuanyin/carbondata CARBONDATA-2685_parallelize_datamap_rebuild Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2443 ---- commit 1e2c068d9539b4b749d62392462f719c02295cd8 Author: xuchuanyin <xuchuanyin@...> Date: 2018-06-29T14:23:55Z Fix bugs in bloomfilter for dictionary/sort/date index columns For dictionary column, carbon convert literal value to dict value, then convert dict value to mdk value, at last it stores the mdk value as internal value in carbonfile. For sort column and date column, the value has also been encoded. Here in bloomfilter datamap, we will index on the encoded data, that is to say: For dictionary/date column, we use the surrogate key as bloom index key; For sort column and ordinary dimensions, we use the plain bytes as bloom index key; For measures, we convert the value to bytes and use it as the bloom index key. Changes are made: 1. FieldConverters were refactored to extract common value convert methods. 2. BloomQueryModel was optimized to support converting literal value to internal value. 2. fix bugs for int/float/date/timestamp as bloom index column 3. fix bugs in dictionary/sort column as bloom index column 4. add tests 5. block (deferred) rebuild for bloom datamap (contains bugs that does not fix in this commit, another PR has been raised) commit f75afe9d380aa3f6821a1c2eda666da54b0d437d Author: xuchuanyin <xuchuanyin@...> Date: 2018-06-29T15:27:55Z fix review comments commit d51c528af6aa363298f5a188d794ba76939bd942 Author: xuchuanyin <xuchuanyin@...> Date: 2018-06-30T03:17:33Z Fix bugs in querying on bloom column with empty value Convert null values to corresponding values while querying on bloom column commit 9f75de4bf3dab4c991c61a72b5c2104073f044ae Author: xuchuanyin <xuchuanyin@...> Date: 2018-06-30T03:28:07Z Add test for querying on longstring bloom index column Supporting longstring as bloom index column has already been done in PR2403, here we only add test for it commit a4a6c60303f6542691ef6d230c141458965ff8e0 Author: xuchuanyin <xuchuanyin@...> Date: 2018-06-30T09:10:04Z Fix bugs for deferred rebuild for bloomfilter datamap Previously when we implement ISSUE-2633, deferred rebuild for bloom datamap is disabled for bloomfilter adtamap due to unhandled bugs. In this commit, we fixed the bugs and brought this feature back. Methods are extracted to reduce duplicate codes. commit 2e9c703797a199a8d957d37429386651d22e197d Author: xuchuanyin <xuchuanyin@...> Date: 2018-07-04T04:03:18Z Parallize datamap rebuild processing for segments Currently in carbondata, while rebuilding datamap, one spark job will be started for each segment and all the jobs are executed serailly. If we have many historical segments, the rebuild will takes a lot of time. Here we optimize the procedure for datamap rebuild and start one start for each segments, all the tasks can be done in parallel in one spark job. ---- --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2443 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5589/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6745/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5576/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2443 This PR depends on PR #2425 --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6761/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2443 Do not trigger build before PR2413 is merged --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2443 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5675/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6912/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5697/ --- |
In reply to this post by qiuchenjian-2
Github user brijoobopanna commented on the issue:
https://github.com/apache/carbondata/pull/2443 retest this please --- |
In reply to this post by qiuchenjian-2
Github user brijoobopanna commented on the issue:
https://github.com/apache/carbondata/pull/2443 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5708/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2443 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6924/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2443 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5695/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2443#discussion_r200892993 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala --- @@ -141,9 +141,10 @@ class RestructureResultImpl extends RestructureResult[Int, Boolean] { } trait RefreshResult[K, V] extends Serializable { - def getKey(key: String, value: Boolean): (K, V) + def getKey(key: String, value: (String, Boolean)): (K, V) --- End diff -- please add comment to describe the modification --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2443#discussion_r200893470 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala --- @@ -144,13 +149,113 @@ class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[ } } +/** + * This class will generate row value which is raw bytes for the dimensions. + */ +class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Array[CarbonColumn]) --- End diff -- Better move this to java class in carbon-hadoop module, so that all integration module can use it --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2443#discussion_r200893650 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala --- @@ -163,38 +268,51 @@ class IndexDataMapRebuildRDD[K, V]( override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val dataMapFactory = - DataMapManager.get().getDataMapProvider( - CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory + val carbonTable = CarbonTable.buildFromTableInfo(getTableInfo) + val dataMapFactory = DataMapManager.get().getDataMapProvider( + carbonTable, dataMapSchema, session).getDataMapFactory var status = false val inputMetrics = new CarbonInputMetrics TaskMetricsMap.getInstance().registerThreadCallback() val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value + val segment = inputSplit.getAllSplits.get(0).getSegment inputMetrics.initBytesReadCallback(context, inputSplit) val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) - val format = createInputFormat(attemptContext) + val format = createInputFormat(segment, attemptContext) val model = format.createQueryModel(inputSplit, attemptContext) // one query id per table model.setQueryId(queryId) model.setVectorReader(false) - model.setForcedDetailRawQuery(false) model.setRequiredRowId(true) var reader: CarbonRecordReader[Array[Object]] = null var refresher: DataMapBuilder = null try { - reader = new CarbonRecordReader( - model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics) - reader.initialize(inputSplit, attemptContext) + val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable, --- End diff -- please format the code properly --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2443#discussion_r200965779 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala --- @@ -163,38 +268,51 @@ class IndexDataMapRebuildRDD[K, V]( override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val dataMapFactory = - DataMapManager.get().getDataMapProvider( - CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory + val carbonTable = CarbonTable.buildFromTableInfo(getTableInfo) + val dataMapFactory = DataMapManager.get().getDataMapProvider( + carbonTable, dataMapSchema, session).getDataMapFactory var status = false val inputMetrics = new CarbonInputMetrics TaskMetricsMap.getInstance().registerThreadCallback() val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value + val segment = inputSplit.getAllSplits.get(0).getSegment inputMetrics.initBytesReadCallback(context, inputSplit) val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) - val format = createInputFormat(attemptContext) + val format = createInputFormat(segment, attemptContext) val model = format.createQueryModel(inputSplit, attemptContext) // one query id per table model.setQueryId(queryId) model.setVectorReader(false) - model.setForcedDetailRawQuery(false) model.setRequiredRowId(true) var reader: CarbonRecordReader[Array[Object]] = null var refresher: DataMapBuilder = null try { - reader = new CarbonRecordReader( - model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics) - reader.initialize(inputSplit, attemptContext) + val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable, --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2443#discussion_r200965829 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala --- @@ -141,9 +141,10 @@ class RestructureResultImpl extends RestructureResult[Int, Boolean] { } trait RefreshResult[K, V] extends Serializable { - def getKey(key: String, value: Boolean): (K, V) + def getKey(key: String, value: (String, Boolean)): (K, V) --- End diff -- comments added to describe the structure --- |
Free forum by Nabble | Edit this page |