Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2103 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4164/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/2103 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3432/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4659/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2103 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4166/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4801/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3578/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3579/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4802/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179949877 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -43,16 +43,19 @@ import org.apache.carbondata.streaming.CarbonStreamingQueryListener * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session. */ class CarbonSession(@transient val sc: SparkContext, - @transient private val existingSharedState: Option[SharedState] + @transient private val existingSharedState: Option[SharedState], + useHiveMetaStore: Boolean = true --- End diff -- I think it should be @transient also --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950086 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -124,15 +127,30 @@ object CarbonSession { getOrCreateCarbonSession(null, null) } + def getOrCreateCarbonSessionWithOutHive(): SparkSession = { --- End diff -- instead of providing this, can you add `def enableInMemCatalog(): CarbonBuilder`, so that user can do: ``` SparkSession.builder .enableInMemCatalog .getOrCreateCarbonSession(x,x) ``` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950203 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala --- @@ -85,11 +85,14 @@ private[sql] case class CarbonAlterTableAddColumnCommand( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - AlterTableUtil + val alterTableVo = AlterTableUtil .updateSchemaInfo(carbonTable, --- End diff -- change to ```AlterTableUtil.updateSchemaInfo(``` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950228 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala --- @@ -85,11 +85,14 @@ private[sql] case class CarbonAlterTableAddColumnCommand( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - AlterTableUtil + val alterTableVo = AlterTableUtil --- End diff -- Better to return tuple instead of return alterTableVo --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950250 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala --- @@ -63,5 +67,39 @@ trait CarbonSessionCatalog { /** * Update the storageformat with new location information */ - def updateStorageLocation(path: Path, storage: CatalogStorageFormat): CatalogStorageFormat + def updateStorageLocation( + path: Path, + storage: CatalogStorageFormat, + newTableName: String, + dbName: String): CatalogStorageFormat + + /** + * Method used to update the table name + * @param tableRenameVo + */ + def alterTableRename(tableRenameVo: TableRenameVo): Unit --- End diff -- please do not use VO object but put 3 parameters --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950297 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionStateWithoutHive.scala --- @@ -0,0 +1,253 @@ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.{AlterTableVo, TableRenameVo} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.util.CarbonReflectionUtils + +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonInMemorySessionCatalog( --- End diff -- I think you can name it `InMemorySessionCatalog` and change the file name also --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950322 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionStateWithoutHive.scala --- @@ -0,0 +1,253 @@ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.{AlterTableVo, TableRenameVo} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.util.CarbonReflectionUtils + +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonInMemorySessionCatalog( --- End diff -- Can you also restrict the class scope, like putting `private[spark] --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950328 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionStateWithoutHive.scala --- @@ -0,0 +1,253 @@ +package org.apache.spark.sql.hive --- End diff -- license is missing --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950352 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionStateWithoutHive.scala --- @@ -0,0 +1,253 @@ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.{AlterTableVo, TableRenameVo} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.util.CarbonReflectionUtils + +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonInMemorySessionCatalog( + externalCatalog: ExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends SessionCatalog( + externalCatalog, + globalTempViewManager, + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) with CarbonSessionCatalog { + + override def alterTableRename(tableRenameVo: TableRenameVo): Unit = { + sparkSession.sessionState.catalog.renameTable( + tableRenameVo.oldTableIdentifier, + tableRenameVo.newTableIdentifier) + } + + override def alterTable(alterTableVo: AlterTableVo) : Unit = { + // NOt Required in case of In-memory catalog + } + + override def alterAddColumns(alterTableVo: AlterTableVo): Unit = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata( + alterTableVo.tableIdentifier) + val structType = catalogTable.schema + var newStructType = structType + alterTableVo.newColumns.get.foreach {cols => + newStructType = structType + .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType)) + } + alterSchema(newStructType, catalogTable, alterTableVo.tableIdentifier) + } + + override def alterDropColumns(alterTableVo: AlterTableVo): Unit = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata( + alterTableVo.tableIdentifier) + val fields = catalogTable.schema.fields.filterNot { field => + alterTableVo.newColumns.get.exists { col => + col.getColumnName.equalsIgnoreCase(field.name) + } + } + alterSchema(new StructType(fields), catalogTable, alterTableVo.tableIdentifier) + } + + override def alterColumnChangeDataType(alterTableVo: AlterTableVo): Unit = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata( + alterTableVo.tableIdentifier) + val a = catalogTable.schema.fields.flatMap { field => + alterTableVo.newColumns.get.map { col => + if (col.getColumnName.equalsIgnoreCase(field.name)) { + StructField(col.getColumnName, + CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType)) + } else { + field + } + } + } + alterSchema(new StructType(a), catalogTable, alterTableVo.tableIdentifier) + } + + private def alterSchema(structType: StructType, + catalogTable: CatalogTable, + tableIdentifier: TableIdentifier): Unit = { + val copy = catalogTable.copy(schema = structType) + sparkSession.sessionState.catalog.alterTable(copy) + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + } + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + + def getThriftTableInfo(tablePath: String): TableInfo = { + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) + CarbonUtil.readSchemaFile(tableMetadataFile) + } + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + val isRelationRefreshed = + CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession) + if (isRelationRefreshed) { + super.lookupRelation(name) + } else { + rtnRelation + } + } + + /** + * returns hive client from HiveExternalCatalog + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + null + } + + override def createPartitions( + tableName: TableIdentifier, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + try { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + val updatedParts = CarbonScalaUtil.updatePartitions(parts, table) + super.createPartitions(tableName, updatedParts, ignoreIfExists) + } catch { + case e: Exception => + super.createPartitions(tableName, parts, ignoreIfExists) + } + } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + override def getPartitionsAlternate(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier) = { + CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier) + } + + /** + * Update the storageformat with new location information + */ + override def updateStorageLocation( + path: Path, + storage: CatalogStorageFormat, + newTableName: String, + dbName: String): CatalogStorageFormat = { + storage.copy(locationUri = Some(path.toUri)) + } +} + +class CarbonSessionStateBuilderWithoutHive (sparkSession: SparkSession, --- End diff -- change to `CarbonInMemorySessionStateBuilder` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950362 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} +import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser} +import org.apache.spark.sql.types.DecimalType + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) + + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("carbondata") || + fileStorage.equalsIgnoreCase("'carbonfile'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, + ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(), + Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage) + helper.createCarbonTable(createTableTuple) + } else { + super.visitCreateHiveTable(ctx) + } + } + + override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = { + + val newColumn = visitColType(ctx.colType) + if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) { + throw new MalformedCarbonCommandException( + "Column names provided are different. Both the column names should be same") + } + + val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match { + case d:DecimalType => ("decimal", Some(List((d.precision, d.scale)))) + case _ => (newColumn.dataType.typeName.toLowerCase, None) + } + + val alterTableChangeDataTypeModel = + AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values), + new CarbonSpark2SqlParser() + .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)), + ctx.tableIdentifier().table.getText.toLowerCase, + ctx.identifier.getText.toLowerCase, + newColumn.name.toLowerCase) + + CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) + } + + + override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = { + --- End diff -- remove empty line --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2103 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5135/ --- |
Free forum by Nabble | Edit this page |