kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r481777679 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,146 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + def sortFilter(filter: Filter): (Filter, Int) = { + filter match { + case sources.And(left, right) => + filterMap.getOrElseUpdate("AND", List()) + if (left.references.toSeq == right.references.toSeq || + right.references.diff(left.references).length == 0) { + val sorted = sortFilter(left) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,146 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + def sortFilter(filter: Filter): (Filter, Int) = { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r481778448 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,146 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 Review comment: no, filter values can be different ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r481787443 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,164 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + // If the filter size is one then no need to reorder. + val sortedFilter = if (filter.references.toSet.size == 1) { + (filter, -1) + } else { + sortFilter(filter, filterMap, table) + } + // If filter has only AND/PR expression then sort the nodes globally using the filterMap. Review comment: ```suggestion // If filter has only AND/OR expression then sort the nodes globally using the filterMap. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#issuecomment-685435856 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3960/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#issuecomment-685438039 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2220/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r481924787 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,146 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. Review comment: agree with Ajantha, better to add a configuration to control it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r481925690 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,164 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + // If the filter size is one then no need to reorder. + val sortedFilter = if (filter.references.toSet.size == 1) { + (filter, -1) + } else { + sortFilter(filter, filterMap, table) + } + // If filter has only AND/PR expression then sort the nodes globally using the filterMap. + // Else sort the subnodes individually + if (!filterMap.contains("OR") && filterMap.contains("AND") && filterMap("AND").nonEmpty) { + val sortedFilterAndOrdinal = filterMap("AND").sortBy(_._2) + (sortedFilterAndOrdinal.map(_._1).reduce(sources.And), sortedFilterAndOrdinal.head._2) + } else if (!filterMap.contains("AND") && filterMap.contains("OR") && + filterMap("OR").nonEmpty) { + val sortedFilterAndOrdinal = filterMap("OR").sortBy(_._2) + (sortedFilterAndOrdinal.map(_._1).reduce(sources.Or), sortedFilterAndOrdinal.head._2) + } else { + sortedFilter + } + } + + def sortFilter(filter: Filter, filterMap: mutable.HashMap[String, List[(Filter, Int)]], + table: CarbonTable): (Filter, Int) = { + filter match { + case sources.And(left, right) => Review comment: as we can see, both `case sources.And(left, right)` and `case sources.Or(left, right)` has same implementation except the filter names, and traversal also same, i think if we can extract the complete case implementation to method and pass filter types to reduce code and make more simple. can you just see if its possible? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r482912253 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ########## @@ -72,7 +72,9 @@ case class CarbonDatasourceHadoopRelation( projects: Seq[NamedExpression], filters: Array[Filter], partitions: Seq[PartitionSpec]): RDD[InternalRow] = { - val filterExpression: Option[Expression] = filters.flatMap { filter => + val reorderedFilter = filters.map(CarbonFilters.reorderFilter(_, carbonTable)).sortBy(_._2) Review comment: Not in this PR, many things and the implementation has to be rewritten, case classes has to be added to replace Tuple with. I will do it in a seperate PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r482926526 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,146 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. Review comment: added a property which can disabled. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r482926591 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,164 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + // If the filter size is one then no need to reorder. + val sortedFilter = if (filter.references.toSet.size == 1) { + (filter, -1) + } else { + sortFilter(filter, filterMap, table) + } + // If filter has only AND/PR expression then sort the nodes globally using the filterMap. Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r482936991 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,164 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + // If the filter size is one then no need to reorder. + val sortedFilter = if (filter.references.toSet.size == 1) { + (filter, -1) + } else { + sortFilter(filter, filterMap, table) + } + // If filter has only AND/PR expression then sort the nodes globally using the filterMap. + // Else sort the subnodes individually + if (!filterMap.contains("OR") && filterMap.contains("AND") && filterMap("AND").nonEmpty) { + val sortedFilterAndOrdinal = filterMap("AND").sortBy(_._2) + (sortedFilterAndOrdinal.map(_._1).reduce(sources.And), sortedFilterAndOrdinal.head._2) + } else if (!filterMap.contains("AND") && filterMap.contains("OR") && + filterMap("OR").nonEmpty) { + val sortedFilterAndOrdinal = filterMap("OR").sortBy(_._2) + (sortedFilterAndOrdinal.map(_._1).reduce(sources.Or), sortedFilterAndOrdinal.head._2) + } else { + sortedFilter + } + } + + def sortFilter(filter: Filter, filterMap: mutable.HashMap[String, List[(Filter, Int)]], + table: CarbonTable): (Filter, Int) = { + filter match { + case sources.And(left, right) => Review comment: done some changes regarding this..please check ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#issuecomment-686524218 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3971/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#issuecomment-686530492 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2231/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r483365586 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ########## @@ -72,7 +72,9 @@ case class CarbonDatasourceHadoopRelation( projects: Seq[NamedExpression], filters: Array[Filter], partitions: Seq[PartitionSpec]): RDD[InternalRow] = { - val filterExpression: Option[Expression] = filters.flatMap { filter => + val reorderedFilter = filters.map(CarbonFilters.reorderFilter(_, carbonTable)).sortBy(_._2) Review comment: create new method CarbonFilters.reorderFilters and check CarbonProperties.isFilterReorderingEnabled once ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ########## @@ -72,7 +72,9 @@ case class CarbonDatasourceHadoopRelation( projects: Seq[NamedExpression], filters: Array[Filter], partitions: Seq[PartitionSpec]): RDD[InternalRow] = { - val filterExpression: Option[Expression] = filters.flatMap { filter => + val reorderedFilter = filters.map(CarbonFilters.reorderFilter(_, carbonTable)).sortBy(_._2) + val filterExpression: Option[Expression] = reorderedFilter.flatMap { tuple => Review comment: case (filter, _) => ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,148 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + // If the filter size is one or the user has disabled reordering then no need to reorder. + if (filter.references.toSet.size == 1 || + !CarbonProperties.isFilterReorderingEnabled) { Review comment: no need checking CarbonProperties.isFilterReorderingEnabled in each CarbonFilters.reorderFilter ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r483419204 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ########## @@ -72,7 +72,9 @@ case class CarbonDatasourceHadoopRelation( projects: Seq[NamedExpression], filters: Array[Filter], partitions: Seq[PartitionSpec]): RDD[InternalRow] = { - val filterExpression: Option[Expression] = filters.flatMap { filter => + val reorderedFilter = filters.map(CarbonFilters.reorderFilter(_, carbonTable)).sortBy(_._2) + val filterExpression: Option[Expression] = reorderedFilter.flatMap { tuple => Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r483419324 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ########## @@ -72,7 +72,9 @@ case class CarbonDatasourceHadoopRelation( projects: Seq[NamedExpression], filters: Array[Filter], partitions: Seq[PartitionSpec]): RDD[InternalRow] = { - val filterExpression: Option[Expression] = filters.flatMap { filter => + val reorderedFilter = filters.map(CarbonFilters.reorderFilter(_, carbonTable)).sortBy(_._2) Review comment: handled in a little different way, please check again ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ########## @@ -373,6 +375,148 @@ object CarbonFilters { val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) getPartitions(partitionFilters, sparkSession, carbonTable) } + + def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = { + val column = filter.references.map(carbonTable.getColumnByName) + if (column.isEmpty) { + -1 + } else { + if (column.head.isDimension) { + column.head.getOrdinal + } else { + column.head.getOrdinal + carbonTable.getAllDimensions.size() + } + } + } + + def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = { + filter match { + case sources.And(left, right) => + collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table) + case sources.Or(left, right) => collectSimilarExpressions(left, table) ++ + collectSimilarExpressions(right, table) + case others => Seq((others, getStorageOrdinal(others, table))) + } + } + + /** + * This method will reorder the filter based on the Storage Ordinal of the column references. + * + * Example1: + * And And + * Or And => Or And + * col3 col1 col2 col1 col1 col3 col1 col2 + * + * **Mixed expression filter reordered locally, but wont be reordered globally.** + * + * Example2: + * And And + * And And => And And + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * Or Or + * Or Or => Or Or + * col3 col1 col2 col1 col1 col1 col2 col3 + * + * **Similar expression filters are reordered globally** + * + * @param filter the filter expression to be reordered + * @return The reordered filter with the current ordinal + */ + def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = { + val filterMap = mutable.HashMap[String, List[(Filter, Int)]]() + // If the filter size is one or the user has disabled reordering then no need to reorder. + if (filter.references.toSet.size == 1 || + !CarbonProperties.isFilterReorderingEnabled) { Review comment: agree, good catch ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#issuecomment-687009718 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3974/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r483475663 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -2105,7 +2105,7 @@ public int getMaxSIRepairLimit(String dbName, String tableName) { // Check if user has enabled/disabled the use of property for the current db and table using // the set command String thresholdValue = getSessionPropertyValue( - CarbonCommonConstants.CARBON_LOAD_SI_REPAIR + "." + dbName + "." + tableName); Review comment: this changed by mistake i think ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#issuecomment-687014802 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2234/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3902: URL: https://github.com/apache/carbondata/pull/3902#discussion_r484171099 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2512,4 +2512,10 @@ private CarbonCommonConstants() { * property which defines the presto query default value */ public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false"; + + @CarbonProperty(dynamicConfigurable = true) Review comment: please update in the document also ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |