kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r480273615 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { Review comment: is filterAttributes same as originalFilterAttributes?? code looks to be same ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r480276719 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation Review comment: why not use `indexTableRelation.carbonRelation.databaseName` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r480285525 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 2. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 && + filterAttributes.intersect(originalFilterAttributes).size == + originalFilterAttributes.size) { + // 3. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) Review comment: use indexTableRelation.carbonTable to get indexCarbonTable ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r480286695 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 2. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 && + filterAttributes.intersect(originalFilterAttributes).size == + originalFilterAttributes.size) { + // 3. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) + var allColumnsFound = true Review comment: use forall to check whether all columns exists or not ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r480289433 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -58,6 +58,16 @@ object NodeType extends Enumeration { */ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { + // to store the sort node per query + var sortNodeForPushDown: Sort = _ + + // to store the limit literal per query + var limitLiteral : Literal = _ + + // by default do not push down notNull filter, + // but for orderby limit push down, push down notNull filter also. Else we get wrong results. + var pushDownNotNullFilter : Boolean = _ Review comment: Why not keep these as local variables in transformFilterToJoin and pass to rewritePlanForSecondaryIndex()? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-683964482 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3934/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-683964722 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2192/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r481013721 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 2. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 && + filterAttributes.intersect(originalFilterAttributes).size == + originalFilterAttributes.size) { + // 3. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) Review comment: indexTableRelation.carbonTable will have main table. Hence this code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r481024238 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 2. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 && + filterAttributes.intersect(originalFilterAttributes).size == + originalFilterAttributes.size) { + // 3. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) Review comment: This part of code is referred from the existing code. Let me rename it here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r481146873 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { Review comment: Agree. it was overlooked I guess. we cannot compare here. I moved this comparison in `createIndexFilterDataFrame` where I decide `needPushDown` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Unit = { + // 1. check all the filter columns present in SI + val originalFilterAttributes = filter.condition collect { + case attr: AttributeReference => + attr.name.toLowerCase + } + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = filter.child.asInstanceOf[LogicalRelation].relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 2. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 && + filterAttributes.intersect(originalFilterAttributes).size == + originalFilterAttributes.size) { + // 3. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) + var allColumnsFound = true Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -58,6 +58,16 @@ object NodeType extends Enumeration { */ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { + // to store the sort node per query + var sortNodeForPushDown: Sort = _ + + // to store the limit literal per query + var limitLiteral : Literal = _ + + // by default do not push down notNull filter, + // but for orderby limit push down, push down notNull filter also. Else we get wrong results. + var pushDownNotNullFilter : Boolean = _ Review comment: because too many functions need to change to pass arguments. I used default arguments and changed required places now. so it is local variable now ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r481168720 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +926,42 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Boolean = { + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = parentTableRelation.carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 1. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1) { + // 2. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) + if (sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != null }) { Review comment: directly return sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != null } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r481170533 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -684,21 +730,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true) val transformChild = false var addProjection = needProjection + // to store the sort node per query + var sortNodeForPushDown: Sort = null + // to store the limit literal per query + var limitLiteral: Literal = null + // by default do not push down notNull filter, + // but for orderby limit push down, push down notNull filter also. Else we get wrong results. + var pushDownNotNullFilter: Boolean = false val transformedPlan = transformPlan(plan, { - case union@Union(children) => + case union@Union(_) => // In case of Union, Extra Project has to be added to the Plan. Because if left table is // pushed to SI and right table is not pushed, then Output Attribute mismatch will happen addProjection = true (union, true) - case sort@Sort(order, global, plan) => + case sort@Sort(_, _, _) => addProjection = true (sort, true) - case filter@Filter(condition, logicalRelation@MatchIndexableRelation(indexableRelation)) + case limit@Limit(literal: Literal, sort@Sort(_, _, child)) => + child match { + case filter: Filter => + if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, filter)) { + sortNodeForPushDown = sort + limitLiteral = literal + pushDownNotNullFilter = true + } + case p: Project if (p.child.isInstanceOf[Filter]) => + if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, + sort, + p.child.asInstanceOf[Filter])) { + sortNodeForPushDown = sort + limitLiteral = literal + pushDownNotNullFilter = true + } + case _ => + } + (limit, transformChild) + case limit@Limit(literal: Literal, _@Project(_, child)) if child.isInstanceOf[Sort] => Review comment: if you use the following then you will not have to check for isInstanceOf of cast the child to Sort. `case limit@Limit(literal: Literal, _@Project(_, Sort(_, _)))` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#discussion_r481197993 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -684,21 +730,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true) val transformChild = false var addProjection = needProjection + // to store the sort node per query + var sortNodeForPushDown: Sort = null + // to store the limit literal per query + var limitLiteral: Literal = null + // by default do not push down notNull filter, + // but for orderby limit push down, push down notNull filter also. Else we get wrong results. + var pushDownNotNullFilter: Boolean = false val transformedPlan = transformPlan(plan, { - case union@Union(children) => + case union@Union(_) => // In case of Union, Extra Project has to be added to the Plan. Because if left table is // pushed to SI and right table is not pushed, then Output Attribute mismatch will happen addProjection = true (union, true) - case sort@Sort(order, global, plan) => + case sort@Sort(_, _, _) => addProjection = true (sort, true) - case filter@Filter(condition, logicalRelation@MatchIndexableRelation(indexableRelation)) + case limit@Limit(literal: Literal, sort@Sort(_, _, child)) => + child match { + case filter: Filter => + if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, filter)) { + sortNodeForPushDown = sort + limitLiteral = literal + pushDownNotNullFilter = true + } + case p: Project if (p.child.isInstanceOf[Filter]) => + if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, + sort, + p.child.asInstanceOf[Filter])) { + sortNodeForPushDown = sort + limitLiteral = literal + pushDownNotNullFilter = true + } + case _ => + } + (limit, transformChild) + case limit@Limit(literal: Literal, _@Project(_, child)) if child.isInstanceOf[Sort] => Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala ########## @@ -824,6 +926,42 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) { } } + private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort, + filter: Filter): Boolean = { + val filterAttributes = filter.condition collect { + case attr: AttributeReference => attr.name.toLowerCase + } + val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get + val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables( + filterAttributes.toSet.asJava, + CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava) + .asScala + val databaseName = parentTableRelation.carbonRelation.databaseName + // filter out all the index tables which are disabled + val enabledMatchingIndexTables = matchingIndexTables + .filter(table => { + sparkSession.sessionState.catalog + .getTableMetadata(TableIdentifier(table, + Some(databaseName))).storage + .properties + .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true") + }) + // 1. check if only one SI matches for the filter columns + if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1) { + // 2. check if all the sort columns is in SI + val sortColumns = sort + .order + .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()) + .toSet + val indexCarbonTable = CarbonEnv + .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession) + if (sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != null }) { Review comment: yeah. done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-684993460 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2213/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-684994151 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3953/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-685305774 @kunal642 : PR is ready. please check and merge ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687265069 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3976/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687265603 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2236/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687269685 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3861: URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687324192 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3978/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |