[GitHub] [carbondata] Indhumathi27 opened a new pull request #4127: [WIP] Geo spatial improvements

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 opened a new pull request #4127: [WIP] Geo spatial improvements

GitBox

Indhumathi27 opened a new pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127


    ### Why is this PR needed?
   
   
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [WIP] Geo spatial improvements

GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-829228356


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3542/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [WIP] Geo spatial improvements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-829280226


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5289/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [WIP] Geo spatial improvements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-829421827


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3545/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [WIP] Geo spatial improvements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-829424392


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5290/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [WIP] Geo spatial improvements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-830127572


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3549/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [WIP] Geo spatial improvements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-830132025


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5294/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#discussion_r626640238



##########
File path: docs/spatial-index-guide.md
##########
@@ -106,18 +107,52 @@ Query with Polygon List UDF predicate
 select * from source_index where IN_POLYGON_LIST('POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)), POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276))', 'OR')
 ```
 
+or
+
+```
+select * from source_index where IN_POLYGON_LIST('select polygon from polyon_table', 'OR')

Review comment:
       may be here we have to mention how `polygon_table`  looks like to know schema of polygon column for better understanding of the example.
   same comments for down also.

##########
File path: geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
##########
@@ -55,7 +55,9 @@ public void processExpression() {
       Matcher matcher = pattern.matcher(polygon);
       while (matcher.find()) {
         String matchedStr = matcher.group();
-        polygons.add(matchedStr);
+        if (!(matchedStr == null || matchedStr.isEmpty())) {

Review comment:
       please use `!StringUtils.isEmpty`

##########
File path: geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
##########
@@ -40,22 +40,41 @@
 
   private String opType;
 
+  /**
+   * If range start's with RANGELIST_REG_EXPRESSION or not
+   */
+  private boolean hasPattern = true;

Review comment:
       `computeRange` may be more suitable name for this variable.

##########
File path: docs/spatial-index-guide.md
##########
@@ -1,3 +1,4 @@
+

Review comment:
       revert empty line

##########
File path: docs/spatial-index-guide.md
##########
@@ -106,18 +107,52 @@ Query with Polygon List UDF predicate
 select * from source_index where IN_POLYGON_LIST('POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)), POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276))', 'OR')
 ```
 
+or
+
+```
+select * from source_index where IN_POLYGON_LIST('select polygon from polyon_table', 'OR')
+```
+
 Query with Polyline List UDF predicate
 
 ```
 select * from source_index where IN_POLYLINE_LIST('LINESTRING (116.137676 40.163503, 116.137676 39.935276, 116.260993 39.935276), LINESTRING (116.260993 39.935276, 116.560993 39.935276, 116.560993 40.163503)', 65)
 ```
 
+or
+
+```
+select * from source_index where IN_POLYLINE_LIST('select polyLine from polyon_table', 65)
+```
+
 Query with Polygon Range List UDF predicate
 
 ```
 select * from source_index where IN_POLYGON_RANGE_LIST('RANGELIST (855279368848 855279368850, 855280799610 855280799612, 855282156300 855282157400), RANGELIST (855279368852 855279368854, 855280799613 855280799615, 855282156200 855282157500)', 'OR')
 ```
 
+Query having Join on Spatial and Polygon table with Polygon Join UDF predicate
+
+```
+select sum(t1.col1), t2.poiId
+from spatial_table t1
+inner join
+(select polygon, poiId from polygon_table) t2

Review comment:
       please give table schema before example for better understanding.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
##########
@@ -30,6 +32,7 @@ object GeoUtilUDFs {
     sparkSession.udf.register("LatLngToGeoId", new LatLngToGeoIdUDF)
     sparkSession.udf.register("ToUpperLayerGeoId", new ToUpperLayerGeoIdUDF)
     sparkSession.udf.register("ToRangeList", new ToRangeListUDF)
+    sparkSession.udf.register("ToRangeListAsString", new ToRangeListAsStringUDF)

Review comment:
       these UDF are exposed to user right ? can we update in the document ?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -56,6 +65,104 @@ object DMLStrategy extends SparkStrategy {
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) =>
         val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
         CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil
+      case Join(left, right, joinType, condition)
+        if condition.isDefined && condition.get.isInstanceOf[ScalaUDF] &&
+           isPolygonUdfFilter(condition) =>

Review comment:
       ```suggestion
              isPolygonJoinUdfFilter(condition) =>
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(

Review comment:
       many things are common with BroadCastSIFilterPushJoin, we cannot extend the same class ?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
##########
@@ -38,6 +43,46 @@ class InPolygonUDF extends (String => Boolean) with Serializable {
   }
 }
 
+@InterfaceAudience.Internal
+class InPolygonJoinRangeListUDF extends ((String, String) => Boolean) with Serializable {
+  override def apply(geoId: String, polygonRanges: String): Boolean = {
+    if (polygonRanges == null || polygonRanges.equalsIgnoreCase("null")) {
+      return false
+    }
+    // parser and get the range list
+    var range: String = polygonRanges
+    val pattern = Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION, Pattern.CASE_INSENSITIVE)
+    val matcher = pattern.matcher(polygonRanges)
+    while ( { matcher.find }) {
+      val matchedStr = matcher.group
+      range = matchedStr
+    }
+    val ranges = PolygonRangeListExpression.getRangeListFromString(range)

Review comment:
       here we need to check against `null` for ranges? some places we check and some places we don't. (example `ToRangeListAsStringUDF`) can we make it uniform? If not required, remove from other places also. also maybe extract a common method for matcher and find if possible.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(
+    leftKeys: Seq[Expression],
+    rightKeys: Seq[Expression],
+    joinType: JoinType,
+    buildSide: BuildSide,
+    condition: Option[Expression],
+    left: SparkPlan,
+    right: SparkPlan
+) extends BinaryExecNode with HashJoin {
+
+  override protected lazy val (buildPlan, streamedPlan) = buildSide match {
+    case BuildLeft => (left, right)
+    case BuildRight => (right, left)
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  private lazy val inputCopy: Array[InternalRow] = {
+    getBuildPlan.map(_.copy()).collect().clone()
+  }
+
+  lazy val spatialTableRDD: Option[RDD[InternalRow]] = streamedPlan.collectFirst {
+    case scan: CarbonDataSourceScan => scan.inputRDDs().head
+  }
+
+  @transient private lazy val boundCondition: InternalRow => Boolean = {
+    if (condition.isDefined) {
+      newPredicate(condition.get, streamedPlan.output ++ buildPlan.output).eval _
+    } else {
+      (_: InternalRow) => true
+    }
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    // get polygon rangeList from polygon table and add as IN_POLYGON_RANGE_LIST filter to the
+    // spatial table
+    addPolygonRangeListFilterToPlan(buildPlan, streamedPlan, inputCopy, condition)
+    // inner join spatial and polygon plan by applying in_polygon join filter
+    streamedPlan.execute().mapPartitionsInternal {
+      streamedIter =>
+        // get polygon table data rows
+        val buildRows = inputCopy
+        val joinedRow = new JoinedRow
+
+        streamedIter.flatMap { streamedRow =>
+          val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
+          // apply in_polygon_join filter
+          if (condition.isDefined) {
+            joinedRows.filter(boundCondition)
+          } else {
+            joinedRows
+          }
+        }
+    }
+  }
+
+  protected def getBuildPlan: RDD[InternalRow] = {
+    buildPlan match {
+      case c@CarbonBroadCastExchangeExec(_, _) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case _ => buildPlan.children.head match {
+        case c@CarbonBroadCastExchangeExec(_, _) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case _ => buildPlan.execute()
+      }
+    }
+  }
+}
+
+object BroadCastPolygonFilterPushJoin {

Review comment:
       also please add more comments for this class and explanation for each




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

GitBox
In reply to this post by GitBox

Indhumathi27 commented on a change in pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#discussion_r627097420



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
##########
@@ -30,6 +32,7 @@ object GeoUtilUDFs {
     sparkSession.udf.register("LatLngToGeoId", new LatLngToGeoIdUDF)
     sparkSession.udf.register("ToUpperLayerGeoId", new ToUpperLayerGeoIdUDF)
     sparkSession.udf.register("ToRangeList", new ToRangeListUDF)
+    sparkSession.udf.register("ToRangeListAsString", new ToRangeListAsStringUDF)

Review comment:
       This udf is not exposed to user. This is for internal purpose only.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
##########
@@ -38,6 +43,46 @@ class InPolygonUDF extends (String => Boolean) with Serializable {
   }
 }
 
+@InterfaceAudience.Internal
+class InPolygonJoinRangeListUDF extends ((String, String) => Boolean) with Serializable {
+  override def apply(geoId: String, polygonRanges: String): Boolean = {
+    if (polygonRanges == null || polygonRanges.equalsIgnoreCase("null")) {
+      return false
+    }
+    // parser and get the range list
+    var range: String = polygonRanges
+    val pattern = Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION, Pattern.CASE_INSENSITIVE)
+    val matcher = pattern.matcher(polygonRanges)
+    while ( { matcher.find }) {
+      val matchedStr = matcher.group
+      range = matchedStr
+    }
+    val ranges = PolygonRangeListExpression.getRangeListFromString(range)

Review comment:
       NULL check is already handled here in line:49. Handled some refactoring to extract common code to new method.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(

Review comment:
        BroadCastPolygonFilterPushJoin implementation is not same as BroadCastSIFilterPushJoin in terms of filter and method definition. so, better to keep it seperate.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(
+    leftKeys: Seq[Expression],
+    rightKeys: Seq[Expression],
+    joinType: JoinType,
+    buildSide: BuildSide,
+    condition: Option[Expression],
+    left: SparkPlan,
+    right: SparkPlan
+) extends BinaryExecNode with HashJoin {
+
+  override protected lazy val (buildPlan, streamedPlan) = buildSide match {
+    case BuildLeft => (left, right)
+    case BuildRight => (right, left)
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  private lazy val inputCopy: Array[InternalRow] = {
+    getBuildPlan.map(_.copy()).collect().clone()
+  }
+
+  lazy val spatialTableRDD: Option[RDD[InternalRow]] = streamedPlan.collectFirst {
+    case scan: CarbonDataSourceScan => scan.inputRDDs().head
+  }
+
+  @transient private lazy val boundCondition: InternalRow => Boolean = {
+    if (condition.isDefined) {
+      newPredicate(condition.get, streamedPlan.output ++ buildPlan.output).eval _
+    } else {
+      (_: InternalRow) => true
+    }
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    // get polygon rangeList from polygon table and add as IN_POLYGON_RANGE_LIST filter to the
+    // spatial table
+    addPolygonRangeListFilterToPlan(buildPlan, streamedPlan, inputCopy, condition)
+    // inner join spatial and polygon plan by applying in_polygon join filter
+    streamedPlan.execute().mapPartitionsInternal {
+      streamedIter =>
+        // get polygon table data rows
+        val buildRows = inputCopy
+        val joinedRow = new JoinedRow
+
+        streamedIter.flatMap { streamedRow =>
+          val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
+          // apply in_polygon_join filter
+          if (condition.isDefined) {
+            joinedRows.filter(boundCondition)
+          } else {
+            joinedRows
+          }
+        }
+    }
+  }
+
+  protected def getBuildPlan: RDD[InternalRow] = {
+    buildPlan match {
+      case c@CarbonBroadCastExchangeExec(_, _) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case _ => buildPlan.children.head match {
+        case c@CarbonBroadCastExchangeExec(_, _) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case _ => buildPlan.execute()
+      }
+    }
+  }
+}
+
+object BroadCastPolygonFilterPushJoin {

Review comment:
       done




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-833298184


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5308/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-833299831


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3563/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#issuecomment-833393531


   LGTM


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] asfgit closed pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

GitBox
In reply to this post by GitBox

asfgit closed pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127


   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]