Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226909139 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226909276 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226909556 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226909935 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like var directScan: Boolean = false. Current flows would not be impacted. Can be enabled using scanRDD.directScan = true --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226910685 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like ```var directScan: Boolean = false```. Current flows would not be impacted and can be enabled using scanRDD.directScan = true instead of this method. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226910752 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like ```var directScan: Boolean = false```. Current flows would not be impacted and can be enabled using scanRDD.directScan = true instead of this method. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226911011 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like ```var directScan: Boolean = false```. Current flows would not be impacted and can be enabled using scanRDD.directScan = true instead of this method. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226912452 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like ```var directScan: Boolean = false```. Current flows would not be impacted and can be enabled using scanRDD.directScan = true instead of this method. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226912513 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like ```var directScan: Boolean = false```. Current flows would not be impacted and can be enabled using scanRDD.directScan = true instead of this method. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r226943171 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- Actually directScan can be passed through constructor like ```var directScan: Boolean = false```. Current flows would not be impacted and can be enabled using scanRDD.directScan = true instead of this method. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227702948 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java --- @@ -1530,6 +1530,12 @@ public int getSortMemorySpillPercentage() { return spillPercentage; } + public boolean getPushRowFiltersForVector() { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227703819 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -228,9 +230,12 @@ class CarbonScanRDD[T: ClassTag]( statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - val carbonDistribution = CarbonProperties.getInstance().getProperty( + var carbonDistribution = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + if (directScan) { + carbonDistribution = CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES + } --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227704173 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -748,4 +754,8 @@ class CarbonScanRDD[T: ClassTag]( vectorReader = boolean } + // TODO find the better way set it. + def setDirectScanSupport(boolean: Boolean): Unit = { --- End diff -- It is a private variable --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227707418 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -337,19 +340,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) - filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + // Check whether spark should handle row filters in case of vector flow. + if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan] + && !hasDictionaryFilterCols) { --- End diff -- In case of the global dictionary if it has the filter then it needs to decode all data before applying the filter in spark's side. So we should disable this option in case of filters on global dictionary. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227708688 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -374,10 +410,27 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) - execution.ProjectExec( - updateRequestedColumnsFunc(updatedProjects, table, - needDecoder).asInstanceOf[Seq[NamedExpression]], - filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) + // Check whether spark should handle row filters in case of vector flow. + if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan] + && !implictsExisted && !hasDictionaryFilterCols) { --- End diff -- updated in comment --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2818 @jackylk @xuchuanyin For testing purpose we are trying to configure it. Once it proved that for all types of queries can be used this path, we can remove this property or defaults to false --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227709784 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -228,9 +230,12 @@ class CarbonScanRDD[T: ClassTag]( statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - val carbonDistribution = CarbonProperties.getInstance().getProperty( + var carbonDistribution = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + if (directScan) { --- End diff -- Not all scenarios it works as of now, In IUD cases it still uses old flow --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2818#discussion_r227709881 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -337,19 +340,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata, needDecoder, updateRequestedColumns.asInstanceOf[Seq[Attribute]]) - filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + // Check whether spark should handle row filters in case of vector flow. + if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan] + && !hasDictionaryFilterCols) { + // Here carbon only do page pruning and row level pruning will be done by spark. + scan.inputRDDs().head match { + case rdd: CarbonScanRDD[InternalRow] => + rdd.setDirectScanSupport(true) + case _ => + } + filterPredicates.reduceLeftOption(expressions.And).map(execution.FilterExec(_, scan)) + .getOrElse(scan) + } else { + filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + } } else { var newProjectList: Seq[Attribute] = Seq.empty + var implictsExisted = false --- End diff -- ok, added comment --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2818 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/988/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2818 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9254/ --- |
Free forum by Nabble | Edit this page |