GitHub user sounakr opened a pull request:
https://github.com/apache/carbondata/pull/1469 [WIP] Spark-2.2 Carbon Integration - Phase 1 Spark-2.2 Carbon Integration. Phase 1 - Compilation ready for Spark-2.2. Phase 2 - Merge the changes of Spark-2.2 and Spark-2.1 to Spark-2 folder. - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sounakr/incubator-carbondata Carbon-Spark-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1469.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1469 ---- commit d98e9a10bf2bf7e5164ba154230af40de2c1e796 Author: sounakr <[hidden email]> Date: 2017-11-06T07:21:17Z Spark-2.2 Carbon Integration ---- --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1458/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/824/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 @sounakr I did not except a new module and with 21 KLOC code. I hope all should be under same package --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r149264766 --- Diff: integration/spark2.2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ScalarSubquery} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.parser.CarbonSparkSqlParser + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + + private def refreshRelationFromCache(identifier: TableIdentifier, + alias: Option[String], + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false + val storePath = CarbonEnv.getInstance(sparkSession).storePath + carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + + val tableMeta = carbonEnv.carbonMetastore + .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (tableMeta.isEmpty || (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMap(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") + } + isRefreshed + } + + // override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { + // makeFunctionBuilder(funcName, Utils.classForName(className)) + // } + // + // /** + // * Construct a [[FunctionBuilder]] based on the provided class that represents a function. + // */ + // private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = { + // // When we instantiate hive UDF wrapper class, we may throw exception if the input + // // expressions don't satisfy the hive UDF, such as type mismatch, input number + // // mismatch, etc. Here we catch the exception and throw AnalysisException instead. + // (children: Seq[Expression]) => { + // try { + // if (classOf[UDF].isAssignableFrom(clazz)) { + // val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children) + // udf.dataType // Force it to check input data types. + // udf + // } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { + // val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children) + // udf.dataType // Force it to check input data types. + // udf + // } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { + // val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children) + // udaf.dataType // Force it to check input data types. + // udaf + // } else if (classOf[UDAF].isAssignableFrom(clazz)) { + // val udaf = HiveUDAFFunction( + // name, + // new HiveFunctionWrapper(clazz.getName), + // children, + // isUDAFBridgeRequired = true) + // udaf.dataType // Force it to check input data types. + // udaf + // } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + // val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children) + // udtf.elementSchema // Force it to check input data types. + // udtf + // } else { + // throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'") + // } + // } catch { + // case ae: AnalysisException => + // throw ae + // case NonFatal(e) => + // val analysisException = + // new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e") + // analysisException.setStackTrace(e.getStackTrace) + // throw analysisException + // } + // } + // } + // + // override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { + // try { + // lookupFunction0(name, children) + // } catch { + // case NonFatal(_) => + // // SPARK-16228 ExternalCatalog may recognize `double`-type only. + // val newChildren = children.map { child => + // if (child.dataType.isInstanceOf[DecimalType]) Cast(child, DoubleType) else child + // } + // lookupFunction0(name, newChildren) + // } + // } + // + // private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = { + // val database = name.database.map(formatDatabaseName) + // val funcName = name.copy(database = database) + // Try(super.lookupFunction(funcName, children)) match { + // case Success(expr) => expr + // case Failure(error) => + // if (functionRegistry.functionExists(funcName.unquotedString)) { + // // If the function actually exists in functionRegistry, it means that there is an + // // error when we create the Expression using the given children. + // // We need to throw the original exception. + // throw error + // } else { + // // This function is not in functionRegistry, let's try to load it as a Hive's + // // built-in function. + // // Hive is case insensitive. + // val functionName = funcName.unquotedString.toLowerCase(Locale.ROOT) + // if (!hiveFunctions.contains(functionName)) { + // failFunctionLookup(funcName) + // } + // + // // TODO: Remove this fallback path once we implement the list of fallback functions + // // defined below in hiveFunctions. + // val functionInfo = { + // try { + // Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse( + // failFunctionLookup(funcName)) + // } catch { + // // If HiveFunctionRegistry.getFunctionInfo throws an exception, + // // we are failing to load a Hive builtin function, which means that + // // the given function is not a Hive builtin function. + // case NonFatal(e) => failFunctionLookup(funcName) + // } + // } + // val className = functionInfo.getFunctionClass.getName + // val functionIdentifier = + // FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database) + // val func = CatalogFunction(functionIdentifier, className, Nil) + // // Put this Hive built-in function to our function registry. + // registerFunction(func, ignoreIfExists = false) + // // Now, we need to create the Expression. + // functionRegistry.lookupFunction(functionName, children) + // } + // } + // } + // + // // TODO Removes this method after implementing Spark native "histogram_numeric". + // override def functionExists(name: FunctionIdentifier): Boolean = { + // super.functionExists(name) || hiveFunctions.contains(name.funcName) + // } + // + // /** List of functions we pass over to Hive. Note that over time this list should go to 0. */ + // // We have a list of Hive built-in functions that we do not support. So, we will check + // // Hive's function registry and lazily load needed functions into our own function registry. + // // List of functions we are explicitly not supporting are: + // // compute_stats, context_ngrams, create_union, + // // current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field, + // // in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap, + // // noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction. + // // Note: don't forget to update SessionCatalog.isTemporaryFunction + // private val hiveFunctions = Seq( + // "histogram_numeric" + // ) +} + +/** + * Session state implementation to override sql parser and adding strategies + * + * @param sparkSession + */ +class CarbonSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None) + extends HiveSessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + override lazy val catalog : CarbonSessionCatalog= { --- End diff -- white space before '=' --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1274/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1275/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1761/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1292/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1762/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1293/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1314/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1820/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1368/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152569806 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala --- @@ -49,8 +49,10 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll .getOrCreateCarbonSession(storeLocation, metaLocation).asInstanceOf[CarbonSession] println("store path from env : " + CarbonEnv.getInstance(localspark).storePath) localspark.sparkContext.setLogLevel("WARN") - localspark.sessionState.asInstanceOf[CarbonSessionState].metadataHive - .runSqlHive( + localspark.asInstanceOf[CarbonSession].asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client.runSqlHive( --- End diff -- It is not required to cast twice --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152569958 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala --- @@ -34,9 +34,9 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE Desc2(Dec2Col1 BigInt, Dec2Col2 String, Dec2Col3 Bigint, Dec2Col4 Decimal) stored by 'carbondata'") } - test("test describe table") { - checkAnswer(sql("DESC Desc1"), sql("DESC Desc2")) - } +// test("test describe table") { --- End diff -- Keep the annotation ignore and fix later --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152570094 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -59,6 +61,26 @@ case class CarbonDictionaryTempDecoder( class CarbonDecoderProcessor { + val rm = universe.runtimeMirror(getClass.getClassLoader) + +// def getFields[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { +// val im = rm.reflect(obj) +// typeOf[T].members.collect { +// case m : MethodSymbol if m.isCaseAccessor && m.name.toString.equalsIgnoreCase(name) => +// val value = im.reflectMethod(m).apply() +// value +// } (collection.breakOut) +// } --- End diff -- remove the commented code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152570507 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -84,7 +106,19 @@ class CarbonDecoderProcessor { } nodeList.add(ArrayCarbonNode(nodeListSeq)) case e: UnaryNode => process(e.child, nodeList) - case i: InsertIntoTable => process(i.child, nodeList) + case i: InsertIntoTable => + var sparkVersion21: Boolean = false + if (typeOf[InsertIntoTable].members.filter(!_.isMethod).toList.contains("query")) { + sparkVersion21 = false --- End diff -- you can get the version from sparkcontext , no need to check like this --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152570693 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -59,6 +61,26 @@ case class CarbonDictionaryTempDecoder( class CarbonDecoderProcessor { + val rm = universe.runtimeMirror(getClass.getClassLoader) + +// def getFields[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { +// val im = rm.reflect(obj) +// typeOf[T].members.collect { +// case m : MethodSymbol if m.isCaseAccessor && m.name.toString.equalsIgnoreCase(name) => +// val value = im.reflectMethod(m).apply() +// value +// } (collection.breakOut) +// } + + def getField[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find( + _.name.toString.equals(name)).map( + l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan] --- End diff -- type casting to logical plan is not required --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152570799 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -59,6 +61,26 @@ case class CarbonDictionaryTempDecoder( class CarbonDecoderProcessor { + val rm = universe.runtimeMirror(getClass.getClassLoader) + +// def getFields[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { +// val im = rm.reflect(obj) +// typeOf[T].members.collect { +// case m : MethodSymbol if m.isCaseAccessor && m.name.toString.equalsIgnoreCase(name) => +// val value = im.reflectMethod(m).apply() +// value +// } (collection.breakOut) +// } + + def getField[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { --- End diff -- Move all reflection methods to one common utility class --- |
Free forum by Nabble | Edit this page |