[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

classic Classic list List threaded Threaded
229 messages Options
1 ... 89101112
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153103644
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTable
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(
    +      _.name.toString.equals(name)).map(
    +      l => im.reflectField(l.asTerm).get
    +        // .asInstanceOf[LogicalPlan]
    +    ).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    +    val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) {
    +      true
    +    } else {
    +      false
    +    }
    +    hasField
    +  }
    +
    +  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
    +      tableAlias: Option[String] = None): UnresolvedRelation = {
    +
    +    val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +    try {
    +      // For 2.1
    +      clazz.getDeclaredField("alias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    +        .newInstance(tableIdentifier,
    +          Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
    +      unresolvedrelation
    +    } catch {
    +      case ce: NoSuchFieldException =>
    +        // For Spark-2.2
    +        val ctor = clazz.getConstructors.head
    +        ctor.setAccessible(true)
    +        val unresolvedrelation = ctor
    +          .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation]
    +        unresolvedrelation
    +    }
    +  }
    +
    +  def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
    +      relation: LogicalPlan,
    +      view: Option[TableIdentifier]): SubqueryAlias = {
    +    if (sparkSession.version.contains("2.1")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //   Project(projList, relation), Option(table.tableIdentifier))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else if (sparkSession.version.contains("2.2")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //  Project(projList, relation))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else {
    +      throw new UnsupportedOperationException("Unsupported Spark version")
    +    }
    +  }
    +
    +  def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) {
    +      overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (m <- typeOf[T].members.filter(!_.isMethod)) {
    +      if (m.toString.contains("overwrite")) {
    +        val typ = m.typeSignature
    +        if (typ.toString.contains("Boolean")) {
    +          // Spark2.2
    +          overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
    +        } else {
    +          overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get)
    +        }
    +      }
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta
    +  }
    +
    +  def getAstBuilder(conf: SQLConf,
    +      sqlParser: CarbonSpark2SqlParser,
    +      sparkSession: SparkSession): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else if (sparkSession.version.contains("2.2")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else {
    +      throw new UnsupportedOperationException("Spark version not supported")
    +    }
    +  }
    +
    +  def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState = {
    +    if (sparkContext.version.contains("2.1")) {
    --- End diff --
   
    This is not correct, `2.2.1` contains `2.1`. It will give wrong match. Move version matching to a common utility


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153103936
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -152,7 +152,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
               new ResolveDataSource(sparkSession) :: Nil
             } else {
               Nil
    -        })
    +        }
    +          )
    --- End diff --
   
    wrong identation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153104066
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -172,6 +173,10 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
           conf,
           newHadoopConf())
       }
    +
    +  def getCarbonEnv() : CarbonEnv = {
    +    catalog.carbonEnv
    --- End diff --
   
    Why make it session based instead of a global CarbonEnv?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153111537
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -172,6 +173,10 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
           conf,
           newHadoopConf())
       }
    +
    +  def getCarbonEnv() : CarbonEnv = {
    +    catalog.carbonEnv
    --- End diff --
   
    Removed. Not Needed. Temporarily added.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153111555
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -152,7 +152,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
               new ResolveDataSource(sparkSession) :: Nil
             } else {
               Nil
    -        })
    +        }
    +          )
    --- End diff --
   
    Rectified


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153111873
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(_.name.toString.equals(name))
    +      .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    +    val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) {
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153112035
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(_.name.toString.equals(name))
    +      .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    +    val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) {
    +      true
    +    } else {
    +      false
    +    }
    +    hasField
    +  }
    +
    +  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
    +      tableAlias: Option[String] = None): UnresolvedRelation = {
    +
    +    val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +    try {
    +      // For 2.1
    +      clazz.getDeclaredField("alias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153112194
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---
    @@ -101,13 +101,15 @@ case class UpdateTable(
         table: UnresolvedRelation,
         columns: List[String],
         selectStmt: String,
    +    alias: Option[String],
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153112203
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---
    @@ -101,13 +101,15 @@ case class UpdateTable(
         table: UnresolvedRelation,
         columns: List[String],
         selectStmt: String,
    +    alias: Option[String],
         filer: String) extends LogicalPlan {
       override def children: Seq[LogicalPlan] = Seq.empty
       override def output: Seq[AttributeReference] = Seq.empty
     }
     
     case class DeleteRecords(
         statement: String,
    +    alias: Option[String],
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153112830
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala ---
    @@ -84,7 +87,16 @@ class CarbonDecoderProcessor {
             }
             nodeList.add(ArrayCarbonNode(nodeListSeq))
           case e: UnaryNode => process(e.child, nodeList)
    -      case i: InsertIntoTable => process(i.child, nodeList)
    +      case i: InsertIntoTable =>
    +        var sparkVersion21: Boolean = false
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153113370
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTable
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(
    +      _.name.toString.equals(name)).map(
    +      l => im.reflectField(l.asTerm).get
    +        // .asInstanceOf[LogicalPlan]
    +    ).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    +    val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) {
    +      true
    +    } else {
    +      false
    +    }
    +    hasField
    +  }
    +
    +  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
    +      tableAlias: Option[String] = None): UnresolvedRelation = {
    +
    +    val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +    try {
    +      // For 2.1
    +      clazz.getDeclaredField("alias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    +        .newInstance(tableIdentifier,
    +          Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
    +      unresolvedrelation
    +    } catch {
    +      case ce: NoSuchFieldException =>
    +        // For Spark-2.2
    +        val ctor = clazz.getConstructors.head
    +        ctor.setAccessible(true)
    +        val unresolvedrelation = ctor
    +          .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation]
    +        unresolvedrelation
    +    }
    +  }
    +
    +  def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
    +      relation: LogicalPlan,
    +      view: Option[TableIdentifier]): SubqueryAlias = {
    +    if (sparkSession.version.contains("2.1")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //   Project(projList, relation), Option(table.tableIdentifier))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else if (sparkSession.version.contains("2.2")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //  Project(projList, relation))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else {
    +      throw new UnsupportedOperationException("Unsupported Spark version")
    +    }
    +  }
    +
    +  def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) {
    +      overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (m <- typeOf[T].members.filter(!_.isMethod)) {
    +      if (m.toString.contains("overwrite")) {
    +        val typ = m.typeSignature
    +        if (typ.toString.contains("Boolean")) {
    +          // Spark2.2
    +          overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
    +        } else {
    +          overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get)
    +        }
    +      }
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta
    +  }
    +
    +  def getAstBuilder(conf: SQLConf,
    +      sqlParser: CarbonSpark2SqlParser,
    +      sparkSession: SparkSession): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else if (sparkSession.version.contains("2.2")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else {
    +      throw new UnsupportedOperationException("Spark version not supported")
    +    }
    +  }
    +
    +  def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState = {
    +    if (sparkContext.version.contains("2.1")) {
    --- End diff --
   
    Modified this to euals instead of contains. Also specifying the actual version i.e. 2.1.0 or 2.2.0


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153113700
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(_.name.toString.equals(name))
    +      .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    --- End diff --
   
    All places from where UnresolvedRelation is getting called, i dont have the spark session or sparkContext inorder to get the version. For e.g. in UpdateRelation of CarbonSpark2SqlParser.scala.
    Therefore used the technique of existence of field in order to judge what parameters to pass. The final motive is to be able to pass the parameters properly.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153116624
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTable
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(
    +      _.name.toString.equals(name)).map(
    +      l => im.reflectField(l.asTerm).get
    +        // .asInstanceOf[LogicalPlan]
    +    ).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    +    val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) {
    +      true
    +    } else {
    +      false
    +    }
    +    hasField
    +  }
    +
    +  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
    +      tableAlias: Option[String] = None): UnresolvedRelation = {
    +
    +    val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +    try {
    +      // For 2.1
    +      clazz.getDeclaredField("alias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    +        .newInstance(tableIdentifier,
    +          Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
    +      unresolvedrelation
    +    } catch {
    +      case ce: NoSuchFieldException =>
    +        // For Spark-2.2
    +        val ctor = clazz.getConstructors.head
    +        ctor.setAccessible(true)
    +        val unresolvedrelation = ctor
    +          .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation]
    +        unresolvedrelation
    +    }
    +  }
    +
    +  def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
    +      relation: LogicalPlan,
    +      view: Option[TableIdentifier]): SubqueryAlias = {
    +    if (sparkSession.version.contains("2.1")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //   Project(projList, relation), Option(table.tableIdentifier))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else if (sparkSession.version.contains("2.2")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //  Project(projList, relation))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else {
    +      throw new UnsupportedOperationException("Unsupported Spark version")
    +    }
    +  }
    +
    +  def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) {
    +      overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (m <- typeOf[T].members.filter(!_.isMethod)) {
    +      if (m.toString.contains("overwrite")) {
    +        val typ = m.typeSignature
    +        if (typ.toString.contains("Boolean")) {
    +          // Spark2.2
    +          overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
    +        } else {
    +          overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get)
    +        }
    +      }
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta
    +  }
    +
    +  def getAstBuilder(conf: SQLConf,
    +      sqlParser: CarbonSpark2SqlParser,
    +      sparkSession: SparkSession): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else if (sparkSession.version.contains("2.2")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else {
    +      throw new UnsupportedOperationException("Spark version not supported")
    +    }
    +  }
    +
    +  def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState = {
    +    if (sparkContext.version.contains("2.1")) {
    --- End diff --
   
    i think it need to use startswith("2.1"). if use equals, when i use spark 2.2.1, it will be failed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153117349
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(_.name.toString.equals(name))
    +      .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    --- End diff --
   
    I think we can add a parameter called "SPARK_VERSION" into CarbonProperties, when start CarbonSession, set this parameter as SparkContext.version, and then we can use this parameter in any places


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153117393
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -58,6 +57,8 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
             )
         }
         if (child.output.size >= relation.carbonRelation.output.size) {
    +      sparkVersion21 = !CarbonClassReflectionUtils.hasField("query", InsertIntoCarbonTable)
    --- End diff --
   
    I think we can add a parameter called "SPARK_VERSION" into CarbonProperties, when start CarbonSession, set this parameter as SparkContext.version, and then we can use this parameter in any places


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spark-2.2 C...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1502/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spark-2.2 C...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1507/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spark-2.2 C...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1511/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spark-2.2 C...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1512/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spark-2.2 C...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1513/



---
1 ... 89101112