Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152726082 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -26,13 +26,20 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} +import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + --- End diff -- remove empty line --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728216 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localalias = alias match { --- End diff -- use localAlias instead of localalias. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728246 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localalias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } val projList = Seq( - UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) + UnresolvedAlias(UnresolvedStar(localalias)), tupleId) // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), - Project(projList, relation), Option(table.tableIdentifier)) +// SubqueryAlias(alias.getOrElse(""), +// Project(projList, relation), Option(table.tableIdentifier)) +// + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728263 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localalias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } val projList = Seq( - UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) + UnresolvedAlias(UnresolvedStar(localalias)), tupleId) // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), - Project(projList, relation), Option(table.tableIdentifier)) +// SubqueryAlias(alias.getOrElse(""), +// Project(projList, relation), Option(table.tableIdentifier)) +// + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation), Option(table.tableIdentifier)) + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), + Project(projList, relation), Option(table.tableIdentifier)).asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728384 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) --- End diff -- the indent of above 3 lines is wrong --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728413 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728420 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier)) + val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + .asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728433 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier)) + val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + .asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation)) + val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor.newInstance(alias.getOrElse(""), updatedSelectPlan) + .asInstanceOf[SubqueryAlias] + subqueryAlias + } else { + throw new UnsupportedOperationException("Unsupported Spark version") + } } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) - val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + // TODO use reflection + // val destinationTable = UnresolvedRelation(table.tableIdentifier, Some(alias.getOrElse(""))) + val destinationTable = + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728438 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { + // SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier)) + val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + .asInstanceOf[SubqueryAlias] + subqueryAlias + } else if (sparkSession.version.contains("2.2")) { + // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), + // Project(projList, relation)) + val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val subqueryAlias = ctor.newInstance(alias.getOrElse(""), updatedSelectPlan) + .asInstanceOf[SubqueryAlias] + subqueryAlias + } else { + throw new UnsupportedOperationException("Unsupported Spark version") + } } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) - val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + // TODO use reflection + // val destinationTable = UnresolvedRelation(table.tableIdentifier, Some(alias.getOrElse(""))) + val destinationTable = + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor + .newInstance(table.tableIdentifier, + Some(alias.getOrElse(""))).asInstanceOf[UnresolvedRelation] + unresolvedrelation + } else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728527 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,50 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta.asInstanceOf[CatalogTable] + } + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { val database = tableIdentifier.database.getOrElse( sparkSession.catalog.currentDatabase) val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => carbonDatasourceHadoopRelation.carbonRelation case LogicalRelation( carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDatasourceHadoopRelation.carbonRelation + +// case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") && +// getField("tableMeta", c) +// .asInstanceOf[CatalogTable].provider +// .isDefined && +// getField("tableMeta", c) +// .asInstanceOf[String] +// .equals("org.apache.spark.sql.CarbonSource") => +// new CarbonSource() +// .createRelation(sparkSession.sqlContext, +// c.tableMeta.storage.properties) +// .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation + + case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") && --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728579 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala --- @@ -153,8 +153,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( - s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + + sparkSession.sessionState --- End diff -- unused code, remove it. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728764 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.ExperimentalMethods +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.hive.CarbonOptimizerCompileCode.AbstractCarbonOptimizerFactory +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ScalaCompilerUtil + + +private[sql] class CarbonOptimizerCodeGenerateFactory(version: String) { + + val carbonoptimizerFactory = if (version.equals("2.1")) { --- End diff -- use startsWith instead of equals. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728865 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConfFactory.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.sql.hive.CarbonSqlConfCompileCode.AbstractCarbonSqlConfFactory +import org.apache.spark.util.ScalaCompilerUtil + + +private[sql] class CarbonSqlConfCodeGenerateFactory(version: String) { + + val carbonSqlConfFactory = if (version.equals("2.1")) { --- End diff -- use startsWith instead of equals --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152728991 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala --- @@ -32,76 +32,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { /** * To initialize dynamic param defaults along with usage docs */ - def addDefaultCarbonParams(): Unit = { - val ENABLE_UNSAFE_SORT = - SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT) - .doc("To enable/ disable unsafe sort.") - .booleanConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) - val CARBON_CUSTOM_BLOCK_DISTRIBUTION = - SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) - .doc("To enable/ disable carbon custom block distribution.") - .booleanConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) - val BAD_RECORDS_LOGGER_ENABLE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) - .doc("To enable/ disable carbon bad record logger.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants - .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) - val BAD_RECORDS_ACTION = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) - .doc("To configure the bad records action.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) - val IS_EMPTY_DATA_BAD_RECORD = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) - .doc("Property to decide weather empty data to be considered bad/ good record.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT - .toBoolean) - val SORT_SCOPE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) - .doc("Property to specify sort scope.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - val BATCH_SORT_SIZE_INMB = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) - .doc("Property to specify batch sort size in MB.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) - val SINGLE_PASS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) - .doc("Property to enable/disable single_pass.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - val BAD_RECORD_PATH = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) - .doc("Property to configure the bad record location.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - val GLOBAL_SORT_PARTITIONS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) - .doc("Property to configure the global sort partitions.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, - CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) - val DATEFORMAT = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) - .doc("Property to configure data format for date type columns.") - .stringConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - } --- End diff -- why does it need to remove above lines? --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152729085 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -17,19 +17,23 @@ package org.apache.spark.sql.parser +import java.lang.reflect.InvocationTargetException + import scala.collection.mutable import scala.language.implicitConversions import org.apache.spark.sql.{DeleteRecords, ShowLoadsCommand, UpdateTable} -import org.apache.spark.sql.catalyst.CarbonDDLSqlParser +import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand} import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand} import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand} import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedRelation, UnresolvedStar} --- End diff -- the order of imports is wrong. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152729132 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val parser = new CarbonSpark2SqlParser + val astBuilder = getAstBuilder() + + def getAstBuilder(): AstBuilder = { + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152729141 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val parser = new CarbonSpark2SqlParser + val astBuilder = getAstBuilder() + + def getAstBuilder(): AstBuilder = { + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder] + astBuilder + } else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152730468 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val parser = new CarbonSpark2SqlParser + val astBuilder = getAstBuilder() + + def getAstBuilder(): AstBuilder = { + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder] + astBuilder + } else if (sparkSession.version.contains("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder] + astBuilder --- End diff -- what's the difference between the code for 2.1 and 2.2 --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152730610 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.hive + --- End diff -- remove empty line --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152730664 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -84,8 +87,8 @@ class CarbonSessionCatalog( var toRefreshRelation = false rtnRelation match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), --- End diff -- wrong indent --- |
Free forum by Nabble | Edit this page |