GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/2818 [CARBONDATA-3011] Add carbon property to configure vector based row pruning push down Added below configuration in carbon to enable or disable row filter push down for vector. ``` carbon.push.rowfilters.for.vector ``` When enabled complete row filters will be handled by carbon in case of vector. If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector. There is no change in flow for non-vector based queries. Default value is `true` Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata perf-config-prop Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2818.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2818 ---- commit 3ec788d41bdd02117e187fa951e0193187415926 Author: ravipesala <ravi.pesala@...> Date: 2018-10-16T05:02:18Z Add carbon property to configure vector based row pruning push down ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2818 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/797/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2818 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/994/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2818 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9062/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r225785705 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- give some meaningful name for variable `boolean` --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r225787305 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -337,19 +340,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) - filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + // Check whether spark should handle row filters in case of vector flow. + if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan] + && !hasDictionaryFilterCols) { --- End diff -- Please cross check if check for `hasDictionaryFilterCols` is required for setting the directScan to true because directScan and Vector reader are co-related as both will use the vector flow. So if vector flow is enabled then directScan can automatically be enabled and vice-versa. If required please add a detailed comment to explain the same --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r225788984 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -374,10 +410,27 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) - execution.ProjectExec( - updateRequestedColumnsFunc(updatedProjects, table, - needDecoder).asInstanceOf[Seq[NamedExpression]], - filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) + // Check whether spark should handle row filters in case of vector flow. + if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan] + && !implictsExisted && !hasDictionaryFilterCols) { --- End diff -- same comment as above for checking hasDictionaryFilterCols --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r225785597 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -228,9 +230,12 @@ class CarbonScanRDD[T: ClassTag]( statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - val carbonDistribution = CarbonProperties.getInstance().getProperty( + var carbonDistribution = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + if (directScan) { + carbonDistribution = CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES + } --- End diff -- We can use `val` in place of `var` and write the code as below `val carbonDistribution = if (directScan) { CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES } else { CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) }` --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r225784934 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java --- @@ -1530,6 +1530,12 @@ public int getSortMemorySpillPercentage() { return spillPercentage; } + public boolean getPushRowFiltersForVector() { --- End diff -- Change the method name to `pushRowFiltersForVector` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/2818 Why default value is set to true? --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2818 We should also think of making this parameter configurable using SET command... --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2818 @ravipesala What's the benefit or loss after configuring this parameter? When should the user enable or disable it? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226323717 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -228,9 +230,12 @@ class CarbonScanRDD[T: ClassTag]( statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - val carbonDistribution = CarbonProperties.getInstance().getProperty( + var carbonDistribution = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + if (directScan) { --- End diff -- I think there is too many path, if directScan can always save memory and copy less, I suggest we always use it, then no need for adding one more configuration --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226324001 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -337,19 +340,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) - filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + // Check whether spark should handle row filters in case of vector flow. + if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan] + && !hasDictionaryFilterCols) { + // Here carbon only do page pruning and row level pruning will be done by spark. + scan.inputRDDs().head match { + case rdd: CarbonScanRDD[InternalRow] => + rdd.setDirectScanSupport(true) + case _ => + } + filterPredicates.reduceLeftOption(expressions.And).map(execution.FilterExec(_, scan)) + .getOrElse(scan) + } else { + filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + } } else { var newProjectList: Seq[Attribute] = Seq.empty + var implictsExisted = false --- End diff -- why this is required, can you add comment --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226908730 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226907173 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- can make directScan as a constructor parameter like '''var directScan: Boolean = false''' Other flows can set using constructor and this method would not be required. scanRdd.directScan = true would do the job. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226908812 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226908841 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226908880 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226908901 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
Free forum by Nabble | Edit this page |