[GitHub] [carbondata] VenuReddy2103 commented on a change in pull request #3436: [CARBONDATA-3548]Geospatial Support: Modified to create and load the table with a nonschema dimension sort column. And added InPolygon UDF

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] VenuReddy2103 commented on a change in pull request #3436: [CARBONDATA-3548]Geospatial Support: Modified to create and load the table with a nonschema dimension sort column. And added InPolygon UDF

GitBox
VenuReddy2103 commented on a change in pull request #3436: [CARBONDATA-3548]Geospatial Support: Modified to create and load the table with a nonschema dimension sort column. And added InPolygon UDF
URL: https://github.com/apache/carbondata/pull/3436#discussion_r373038067
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 ##########
 @@ -264,8 +308,142 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
           s"Carbon Implicit column ${col.column} is not allowed in" +
           s" column name while creating table")
       }
+    }
+  }
 
+  /**
+   * The method parses, validates and processes the index_handler property.
+   * @param tableProperties Table properties
+   * @param tableFields  Sequence of table fields
+   * @return <Seq[Field]> Sequence of index fields to add to table fields
+   */
+  private def processIndexProperty(tableProperties: mutable.Map[String, String],
+                                   tableFields: Seq[Field]): Seq[Field] = {
+    val option = tableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+    val fields = ListBuffer[Field]()
+    if (option.isDefined) {
+      if (option.get.trim.isEmpty) {
+        throw new MalformedCarbonCommandException(
+          s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+            s"Option value is empty.")
+      }
+      option.get.split(",").map(_.trim).foreach { handler =>
+        /* Validate target column name */
+        if (tableFields.exists(_.column.equalsIgnoreCase(handler))) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+              s"handler: $handler must not match with any other column name in the table")
+        }
+        val TYPE = s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.type"
+        val SOURCE_COLUMNS = s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.sourcecolumns"
+        val SOURCE_COLUMN_TYPES
+                  = s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.sourcecolumntypes"
+        val HANDLER_CLASS = s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.class"
+        val HANDLER_INSTANCE = s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.instance"
+
+        val handlerType = tableProperties.get(TYPE)
+        if (handlerType.isEmpty || handlerType.get.trim.isEmpty) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+              s"$TYPE properties must be specified.")
+        }
+        val sourceColumnsOption = tableProperties.get(SOURCE_COLUMNS)
+        if (sourceColumnsOption.isEmpty || sourceColumnsOption.get.trim.isEmpty) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+              s"$SOURCE_COLUMNS property must be specified.")
+        }
+        val sourcesWithoutSpaces = sourceColumnsOption.get.replaceAll("\\s", "")
+        /* Validate source columns */
+        val sources = sourcesWithoutSpaces.split(",")
+        if (sources.distinct.length != sources.size) {
+          throw new MalformedCarbonCommandException(
+            s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+              s"$SOURCE_COLUMNS property cannot have duplicate columns.")
+        }
+        val sourceTypes = StringBuilder.newBuilder
+        sources.foreach { column =>
+          tableFields.find(_.column.equalsIgnoreCase(column)) match {
+            case Some(field) => sourceTypes.append(field.dataType.get).append(",")
+            case None =>
+              throw new MalformedCarbonCommandException(
+                s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+                  s"Source column: $column in property " +
+                  s"$SOURCE_COLUMNS must be a column in the table.")
+          }
+        }
+        tableProperties.put(SOURCE_COLUMNS, sourcesWithoutSpaces)
+        tableProperties.put(SOURCE_COLUMN_TYPES, sourceTypes.dropRight(1).toString())
+        val handlerClass = tableProperties.get(HANDLER_CLASS)
+        val handlerClassName: String = handlerClass match {
+          case Some(className) => className.trim
+          case None =>
+            /* use handler type to find the default implementation */
+           if (handlerType.get.trim.equalsIgnoreCase(CarbonCommonConstants.GEOHASH)) {
+              /* Use GeoHash default implementation */
+              val className = classOf[GeoHashImpl].getName
+              tableProperties.put(HANDLER_CLASS, className)
+              className
+            } else {
+              throw new MalformedCarbonCommandException(
+                s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+                  s"Unsupported value: ${handlerType.get} specified for property $TYPE.")
+            }
+        }
+
+        try {
+          val handlerClass: Class[_] = java.lang.Class.forName(handlerClassName)
+          val instance = handlerClass.newInstance().asInstanceOf[
+            CustomIndex[Long, String, java.util.List[Array[Long]]]]
+          instance.init(handler, tableProperties.asJava)
+
+          // TODO Need to check to how to store the instance. This serialization may be incorrect.
+          val bos = new ByteArrayOutputStream()
+          val oos = new ObjectOutputStream(bos)
+          oos.writeObject(instance)
+          oos.flush()
+          tableProperties.put(HANDLER_INSTANCE, Base64.getEncoder().encodeToString(bos.toByteArray))
+        } catch {
+          case ex: ClassNotFoundException =>
+            throw new MalformedCarbonCommandException(
+              s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property process failed. " +
+                s"$handlerClassName class in property $HANDLER_CLASS failed with $ex")
+          case ex@(_: InstantiationError | _: IllegalAccessException) =>
+            throw new MalformedCarbonCommandException(
+              s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property process failed. " +
+                s"Instantiation of class $handlerClassName failed with $ex")
+          case ex: ClassCastException =>
+            throw new MalformedCarbonCommandException(
+              s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property process failed. " +
+                s"Failed due to $ex")
+        }
+
+        /* Add target column in sort column */
+        var sortKey = tableProperties.getOrElse(CarbonCommonConstants.SORT_COLUMNS, "")
+        if (!sortKey.isEmpty) {
+          /* Source columns are not allowed to be specified in sort columns. Instead target column
+          is implicitly treated as sort column */
+          sources.foreach { column =>
+            sortKey.split(",").map(_.trim).foreach { key =>
+              if (key.equalsIgnoreCase(column)) {
+                throw new MalformedCarbonCommandException(
+                  s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid. " +
+                    s"Source column: $key is not allowed in ${CarbonCommonConstants.SORT_COLUMNS}" +
+                    s" property. Instead, handler: $handler is implicitly treated as sort column.")
+              }
+            }
+          }
+          sortKey += "," + handler
+        } else {
+          sortKey = handler
+        }
+        tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKey)
+        // TODO Need to convert it to DataType object and pass it to StructField dataType
+        val dataType = tableProperties.get(TYPE)
+        fields += getField(StructField(handler, LongType))
 
 Review comment:
   Have refactored it. Currently support only index type of long type.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services