Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153103644 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.parser.AstBuilder +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonClassReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find( + _.name.toString.equals(name)).map( + l => im.reflectField(l.asTerm).get + // .asInstanceOf[LogicalPlan] + ).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { + val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) { + true + } else { + false + } + hasField + } + + def getUnresolvedRelation(tableIdentifier: TableIdentifier, + tableAlias: Option[String] = None): UnresolvedRelation = { + + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation") + try { + // For 2.1 + clazz.getDeclaredField("alias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(tableIdentifier, + Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } catch { + case ce: NoSuchFieldException => + // For Spark-2.2 + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } + } + + def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String], + relation: LogicalPlan, + view: Option[TableIdentifier]): SubqueryAlias = { + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation), Option(table.tableIdentifier)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias] + subqueryAlias + } else { + throw new UnsupportedOperationException("Unsupported Spark version") + } + } + + def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = { + var overwriteboolean: Boolean = false + val im = rm.reflect(obj) + for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) { + overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean] + } + overwriteboolean + } + + def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = { + var overwriteboolean: Boolean = false + val im = rm.reflect(obj) + for (m <- typeOf[T].members.filter(!_.isMethod)) { + if (m.toString.contains("overwrite")) { + val typ = m.typeSignature + if (typ.toString.contains("Boolean")) { + // Spark2.2 + overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean] + } else { + overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get) + } + } + } + overwriteboolean + } + + def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta + } + + def getAstBuilder(conf: SQLConf, + sqlParser: CarbonSpark2SqlParser, + sparkSession: SparkSession): AstBuilder = { + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder] + astBuilder + } else if (sparkSession.version.contains("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder] + astBuilder + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState = { + if (sparkContext.version.contains("2.1")) { --- End diff -- This is not correct, `2.2.1` contains `2.1`. It will give wrong match. Move version matching to a common utility --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153103936 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -152,7 +152,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp new ResolveDataSource(sparkSession) :: Nil } else { Nil - }) + } + ) --- End diff -- wrong identation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153104066 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -172,6 +173,10 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp conf, newHadoopConf()) } + + def getCarbonEnv() : CarbonEnv = { + catalog.carbonEnv --- End diff -- Why make it session based instead of a global CarbonEnv? --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153111537 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -172,6 +173,10 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp conf, newHadoopConf()) } + + def getCarbonEnv() : CarbonEnv = { + catalog.carbonEnv --- End diff -- Removed. Not Needed. Temporarily added. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153111555 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -152,7 +152,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp new ResolveDataSource(sparkSession) :: Nil } else { Nil - }) + } + ) --- End diff -- Rectified --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153111873 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find(_.name.toString.equals(name)) + .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { + val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) { --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153112035 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find(_.name.toString.equals(name)) + .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { + val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) { + true + } else { + false + } + hasField + } + + def getUnresolvedRelation(tableIdentifier: TableIdentifier, + tableAlias: Option[String] = None): UnresolvedRelation = { + + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation") + try { + // For 2.1 + clazz.getDeclaredField("alias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153112194 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala --- @@ -101,13 +101,15 @@ case class UpdateTable( table: UnresolvedRelation, columns: List[String], selectStmt: String, + alias: Option[String], --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153112203 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala --- @@ -101,13 +101,15 @@ case class UpdateTable( table: UnresolvedRelation, columns: List[String], selectStmt: String, + alias: Option[String], filer: String) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq.empty override def output: Seq[AttributeReference] = Seq.empty } case class DeleteRecords( statement: String, + alias: Option[String], --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153112830 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -84,7 +87,16 @@ class CarbonDecoderProcessor { } nodeList.add(ArrayCarbonNode(nodeListSeq)) case e: UnaryNode => process(e.child, nodeList) - case i: InsertIntoTable => process(i.child, nodeList) + case i: InsertIntoTable => + var sparkVersion21: Boolean = false --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153113370 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.parser.AstBuilder +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonClassReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find( + _.name.toString.equals(name)).map( + l => im.reflectField(l.asTerm).get + // .asInstanceOf[LogicalPlan] + ).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { + val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) { + true + } else { + false + } + hasField + } + + def getUnresolvedRelation(tableIdentifier: TableIdentifier, + tableAlias: Option[String] = None): UnresolvedRelation = { + + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation") + try { + // For 2.1 + clazz.getDeclaredField("alias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(tableIdentifier, + Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } catch { + case ce: NoSuchFieldException => + // For Spark-2.2 + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } + } + + def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String], + relation: LogicalPlan, + view: Option[TableIdentifier]): SubqueryAlias = { + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation), Option(table.tableIdentifier)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias] + subqueryAlias + } else { + throw new UnsupportedOperationException("Unsupported Spark version") + } + } + + def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = { + var overwriteboolean: Boolean = false + val im = rm.reflect(obj) + for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) { + overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean] + } + overwriteboolean + } + + def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = { + var overwriteboolean: Boolean = false + val im = rm.reflect(obj) + for (m <- typeOf[T].members.filter(!_.isMethod)) { + if (m.toString.contains("overwrite")) { + val typ = m.typeSignature + if (typ.toString.contains("Boolean")) { + // Spark2.2 + overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean] + } else { + overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get) + } + } + } + overwriteboolean + } + + def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta + } + + def getAstBuilder(conf: SQLConf, + sqlParser: CarbonSpark2SqlParser, + sparkSession: SparkSession): AstBuilder = { + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder] + astBuilder + } else if (sparkSession.version.contains("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder] + astBuilder + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState = { + if (sparkContext.version.contains("2.1")) { --- End diff -- Modified this to euals instead of contains. Also specifying the actual version i.e. 2.1.0 or 2.2.0 --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153113700 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find(_.name.toString.equals(name)) + .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { --- End diff -- All places from where UnresolvedRelation is getting called, i dont have the spark session or sparkContext inorder to get the version. For e.g. in UpdateRelation of CarbonSpark2SqlParser.scala. Therefore used the technique of existence of field in order to judge what parameters to pass. The final motive is to be able to pass the parameters properly. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153116624 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.parser.AstBuilder +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonClassReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find( + _.name.toString.equals(name)).map( + l => im.reflectField(l.asTerm).get + // .asInstanceOf[LogicalPlan] + ).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { + val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name)) { + true + } else { + false + } + hasField + } + + def getUnresolvedRelation(tableIdentifier: TableIdentifier, + tableAlias: Option[String] = None): UnresolvedRelation = { + + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation") + try { + // For 2.1 + clazz.getDeclaredField("alias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(tableIdentifier, + Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } catch { + case ce: NoSuchFieldException => + // For Spark-2.2 + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } + } + + def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String], + relation: LogicalPlan, + view: Option[TableIdentifier]): SubqueryAlias = { + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation), Option(table.tableIdentifier)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias] + subqueryAlias + } else { + throw new UnsupportedOperationException("Unsupported Spark version") + } + } + + def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = { + var overwriteboolean: Boolean = false + val im = rm.reflect(obj) + for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled"))) { + overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean] + } + overwriteboolean + } + + def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = { + var overwriteboolean: Boolean = false + val im = rm.reflect(obj) + for (m <- typeOf[T].members.filter(!_.isMethod)) { + if (m.toString.contains("overwrite")) { + val typ = m.typeSignature + if (typ.toString.contains("Boolean")) { + // Spark2.2 + overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean] + } else { + overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get) + } + } + } + overwriteboolean + } + + def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta + } + + def getAstBuilder(conf: SQLConf, + sqlParser: CarbonSpark2SqlParser, + sparkSession: SparkSession): AstBuilder = { + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder] + astBuilder + } else if (sparkSession.version.contains("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder] + astBuilder + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState = { + if (sparkContext.version.contains("2.1")) { --- End diff -- i think it need to use startswith("2.1"). if use equals, when i use spark 2.2.1, it will be failed --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153117349 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find(_.name.toString.equals(name)) + .map(l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { --- End diff -- I think we can add a parameter called "SPARK_VERSION" into CarbonProperties, when start CarbonSession, set this parameter as SparkContext.version, and then we can use this parameter in any places --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153117393 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -58,6 +57,8 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { ) } if (child.output.size >= relation.carbonRelation.output.size) { + sparkVersion21 = !CarbonClassReflectionUtils.hasField("query", InsertIntoCarbonTable) --- End diff -- I think we can add a parameter called "SPARK_VERSION" into CarbonProperties, when start CarbonSession, set this parameter as SparkContext.version, and then we can use this parameter in any places --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1502/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1507/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1511/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1512/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1513/ --- |
Free forum by Nabble | Edit this page |