[GitHub] [carbondata] ajantha-bhat opened a new pull request #3861: [CARBONDATA-3922] Support order limit by push down for secondary index queries

classic Classic list List threaded Threaded
49 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r480273615



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {

Review comment:
       is filterAttributes same as originalFilterAttributes?? code looks to be same
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r480276719



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation

Review comment:
       why not use `indexTableRelation.carbonRelation.databaseName`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r480285525



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 2. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 &&
+        filterAttributes.intersect(originalFilterAttributes).size ==
+        originalFilterAttributes.size) {
+      // 3. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)

Review comment:
       use indexTableRelation.carbonTable to get indexCarbonTable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r480286695



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 2. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 &&
+        filterAttributes.intersect(originalFilterAttributes).size ==
+        originalFilterAttributes.size) {
+      // 3. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)
+      var allColumnsFound = true

Review comment:
       use forall to check whether all columns exists or not




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r480289433



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -58,6 +58,16 @@ object NodeType extends Enumeration {
  */
 class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
 
+  // to store the sort node per query
+  var sortNodeForPushDown: Sort = _
+
+  // to store the limit literal per query
+  var limitLiteral : Literal = _
+
+  // by default do not push down notNull filter,
+  // but for orderby limit push down, push down notNull filter also. Else we get wrong results.
+  var pushDownNotNullFilter : Boolean = _

Review comment:
       Why not keep these as local variables in transformFilterToJoin and pass to rewritePlanForSecondaryIndex()?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-683964482


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3934/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-683964722


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2192/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481013721



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 2. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 &&
+        filterAttributes.intersect(originalFilterAttributes).size ==
+        originalFilterAttributes.size) {
+      // 3. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)

Review comment:
       indexTableRelation.carbonTable will have main table. Hence this code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481024238



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 2. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 &&
+        filterAttributes.intersect(originalFilterAttributes).size ==
+        originalFilterAttributes.size) {
+      // 3. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)

Review comment:
       This part of code is referred from the existing code. Let me rename it here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481146873



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {

Review comment:
       Agree. it was overlooked I guess. we cannot compare here. I moved this comparison in `createIndexFilterDataFrame` where I decide `needPushDown`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +904,57 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Unit = {
+    // 1. check all the filter columns present in SI
+    val originalFilterAttributes = filter.condition collect {
+      case attr: AttributeReference =>
+        attr.name.toLowerCase
+    }
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val indexTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(indexTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = filter.child.asInstanceOf[LogicalRelation].relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 2. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1 &&
+        filterAttributes.intersect(originalFilterAttributes).size ==
+        originalFilterAttributes.size) {
+      // 3. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)
+      var allColumnsFound = true

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -58,6 +58,16 @@ object NodeType extends Enumeration {
  */
 class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
 
+  // to store the sort node per query
+  var sortNodeForPushDown: Sort = _
+
+  // to store the limit literal per query
+  var limitLiteral : Literal = _
+
+  // by default do not push down notNull filter,
+  // but for orderby limit push down, push down notNull filter also. Else we get wrong results.
+  var pushDownNotNullFilter : Boolean = _

Review comment:
       because too many functions need to change to pass arguments.
   
   I used default arguments and changed required places now. so it is local variable now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481168720



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +926,42 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Boolean = {
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = parentTableRelation.carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 1. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1) {
+      // 2. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)
+      if (sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != null }) {

Review comment:
       directly return sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != null }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481170533



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -684,21 +730,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
       .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
     val transformChild = false
     var addProjection = needProjection
+    // to store the sort node per query
+    var sortNodeForPushDown: Sort = null
+    // to store the limit literal per query
+    var limitLiteral: Literal = null
+    // by default do not push down notNull filter,
+    // but for orderby limit push down, push down notNull filter also. Else we get wrong results.
+    var pushDownNotNullFilter: Boolean = false
     val transformedPlan = transformPlan(plan, {
-      case union@Union(children) =>
+      case union@Union(_) =>
         // In case of Union, Extra Project has to be added to the Plan. Because if left table is
         // pushed to SI and right table is not pushed, then Output Attribute mismatch will happen
         addProjection = true
         (union, true)
-      case sort@Sort(order, global, plan) =>
+      case sort@Sort(_, _, _) =>
         addProjection = true
         (sort, true)
-      case filter@Filter(condition, logicalRelation@MatchIndexableRelation(indexableRelation))
+      case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
+        child match {
+          case filter: Filter =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, filter)) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case p: Project if (p.child.isInstanceOf[Filter]) =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+              sort,
+              p.child.asInstanceOf[Filter])) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case _ =>
+        }
+        (limit, transformChild)
+      case limit@Limit(literal: Literal, _@Project(_, child)) if child.isInstanceOf[Sort] =>

Review comment:
       if you use the following then you will not have to check for isInstanceOf of cast the child to Sort.
   
   `case limit@Limit(literal: Literal, _@Project(_, Sort(_, _)))`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481197993



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -684,21 +730,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
       .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
     val transformChild = false
     var addProjection = needProjection
+    // to store the sort node per query
+    var sortNodeForPushDown: Sort = null
+    // to store the limit literal per query
+    var limitLiteral: Literal = null
+    // by default do not push down notNull filter,
+    // but for orderby limit push down, push down notNull filter also. Else we get wrong results.
+    var pushDownNotNullFilter: Boolean = false
     val transformedPlan = transformPlan(plan, {
-      case union@Union(children) =>
+      case union@Union(_) =>
         // In case of Union, Extra Project has to be added to the Plan. Because if left table is
         // pushed to SI and right table is not pushed, then Output Attribute mismatch will happen
         addProjection = true
         (union, true)
-      case sort@Sort(order, global, plan) =>
+      case sort@Sort(_, _, _) =>
         addProjection = true
         (sort, true)
-      case filter@Filter(condition, logicalRelation@MatchIndexableRelation(indexableRelation))
+      case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
+        child match {
+          case filter: Filter =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, filter)) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case p: Project if (p.child.isInstanceOf[Filter]) =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+              sort,
+              p.child.asInstanceOf[Filter])) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case _ =>
+        }
+        (limit, transformChild)
+      case limit@Limit(literal: Literal, _@Project(_, child)) if child.isInstanceOf[Sort] =>

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +926,42 @@ class CarbonSecondaryIndexOptimizer(sparkSession: SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, sort: Sort,
+      filter: Filter): Boolean = {
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = parentTableRelation.carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 1. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size == 1) {
+      // 2. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), enabledMatchingIndexTables.head)(sparkSession)
+      if (sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != null }) {

Review comment:
       yeah. done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-684993460


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2213/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-684994151


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3953/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-685305774


   @kunal642 : PR is ready. please check and merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687265069


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3976/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687265603


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2236/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687269685


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3861: [CARBONDATA-3922] Support order by limit push down for secondary index queries

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#issuecomment-687324192


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3978/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


123