Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154119145 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{CreateTablePreExecutionEvent, OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = GetDB.getDatabaseName(dbName, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(databaseName, sparkSession, + CarbonProperties.getStorePath) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) --- End diff -- why locks are required to take here? we are not updating any files right, we are just updating to DB, I think concurrent scenarios it can take care internally. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154119662 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{CreateTablePreExecutionEvent, OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = GetDB.getDatabaseName(dbName, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(databaseName, sparkSession, + CarbonProperties.getStorePath) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) + var locks = List.empty[ICarbonLock] + try { + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + locks = CommonUtil.acquireLock(locksToBeAcquired, absoluteTableIdentifier) + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + if (!sparkSession.sessionState.catalog.listTables(databaseName) + .exists(_.table.equalsIgnoreCase(tableName))) { + val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) + // check the existence of the schema file to know its a carbon table + val schemaFilePath = carbonTablePath.getSchemaFilePath + // if schema file does not exist then the table will either non carbon table or stale + // carbon table + if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) { + // read TableInfo + val thriftTableInfo: org.apache.carbondata.format.TableInfo = metaStore + .getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val tableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + databaseName, + tableName, + absoluteTableIdentifier.getTablePath) + // 2.2 register the table with the hive check if the table being registered has + // aggregate table then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + val dataMapSchemaList = tableInfo.getDataMapSchemaList + if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) { + // validate all the aggregate tables are copied at the storeLocation + val allExists = validateAllAggregateTablePresent(databaseName, + dataMapSchemaList, databaseLocation) + if (!allExists) { + // fail the register operation + val msg = s"Table registration with Database name [$databaseName] and Table name " + + s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" + + s" not copied under database [$databaseName]" + LOGGER.audit(msg) + CarbonException.analysisException(msg) + } + // 2.2.1 Register the aggregate tables to hive + registerAggregates(databaseName, dataMapSchemaList, databaseLocation)(sparkSession) + } + registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession) + } else { + LOGGER.audit( + s"Table registration with Database name [$databaseName] and Table name [$tableName] " + + s"failed." + + s"Table [$tableName] either non carbon table or stale carbon table under database " + + s"[$databaseName]") + } + } else { + LOGGER.audit( + s"Table registration with Database name [$databaseName] and Table name [$tableName] " + + s"failed." + + s"Table [$tableName] either already exists or registered under database [$dbName]") + } + } + finally { + // release lock after command execution completion + CommonUtil.releaseLocks(locks) + } + // update the schema modified time + metaStore.updateAndTouchSchemasUpdatedTime() + Seq.empty + } + + /** + * the method prepare the data type for raw column + * + * @param column + * @return + */ + def prepareDataType(column: ColumnSchema): String = { + column.getDataType.getName.toLowerCase() match { + case "decimal" => + "decimal(" + column.getPrecision + "," + column.getScale + ")" + case others => + others + } + } + + /** + * The method register the carbon table with hive + * + * @param dbName + * @param tableName + * @param tableInfo + * @param sparkSession + * @return + */ + def registerTableWithHive(dbName: String, + tableName: String, + tableInfo: TableInfo)(sparkSession: SparkSession): Any = { + val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonSchemaString = carbonMetaStore.generateTableSchemaString(tableInfo, + tableInfo.getOrCreateAbsoluteTableIdentifier()) + val tablePath = tableInfo.getTablePath + try { + val columns = tableInfo.getFactTable.getListOfColumns.asScala.filter(!_.isInvisible) + // TODO adding the complex type fields should be handled + // getAll Fields + val fields = new Array[String](columns.size) + columns.map(column => fields(column.getSchemaOrdinal) + = column.getColumnName + ' ' + prepareDataType(column)) + val operationContext = new OperationContext + val createTablePreExecutionEvent: RefreshTablePreExecutionEvent = + new RefreshTablePreExecutionEvent(sparkSession, + tableInfo.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier, + tablePath) + OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext) + sparkSession.sql( --- End diff -- I think you can directly CreateTableCOmmand here instead of creating sql here. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1583 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2008/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154120941 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -193,6 +197,32 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec( CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil } + case RefreshTable(tableIdentifier) => + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore --- End diff -- here just call `RefreshCarbonTableCommand ` and at the end of that command you can call `RefreshTable` --- |
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154276544 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -193,6 +197,32 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec( CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil } + case RefreshTable(tableIdentifier) => + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore --- End diff -- Fixed --- |
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154276519 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{CreateTablePreExecutionEvent, OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = GetDB.getDatabaseName(dbName, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(databaseName, sparkSession, + CarbonProperties.getStorePath) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) + var locks = List.empty[ICarbonLock] + try { + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + locks = CommonUtil.acquireLock(locksToBeAcquired, absoluteTableIdentifier) + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + if (!sparkSession.sessionState.catalog.listTables(databaseName) + .exists(_.table.equalsIgnoreCase(tableName))) { + val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) + // check the existence of the schema file to know its a carbon table + val schemaFilePath = carbonTablePath.getSchemaFilePath + // if schema file does not exist then the table will either non carbon table or stale + // carbon table + if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) { + // read TableInfo + val thriftTableInfo: org.apache.carbondata.format.TableInfo = metaStore + .getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val tableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + databaseName, + tableName, + absoluteTableIdentifier.getTablePath) + // 2.2 register the table with the hive check if the table being registered has + // aggregate table then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + val dataMapSchemaList = tableInfo.getDataMapSchemaList + if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) { + // validate all the aggregate tables are copied at the storeLocation + val allExists = validateAllAggregateTablePresent(databaseName, + dataMapSchemaList, databaseLocation) + if (!allExists) { + // fail the register operation + val msg = s"Table registration with Database name [$databaseName] and Table name " + + s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" + + s" not copied under database [$databaseName]" + LOGGER.audit(msg) + CarbonException.analysisException(msg) + } + // 2.2.1 Register the aggregate tables to hive + registerAggregates(databaseName, dataMapSchemaList, databaseLocation)(sparkSession) + } + registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession) + } else { + LOGGER.audit( + s"Table registration with Database name [$databaseName] and Table name [$tableName] " + + s"failed." + + s"Table [$tableName] either non carbon table or stale carbon table under database " + + s"[$databaseName]") + } + } else { + LOGGER.audit( + s"Table registration with Database name [$databaseName] and Table name [$tableName] " + + s"failed." + + s"Table [$tableName] either already exists or registered under database [$dbName]") + } + } + finally { + // release lock after command execution completion + CommonUtil.releaseLocks(locks) + } + // update the schema modified time + metaStore.updateAndTouchSchemasUpdatedTime() + Seq.empty + } + + /** + * the method prepare the data type for raw column + * + * @param column + * @return + */ + def prepareDataType(column: ColumnSchema): String = { + column.getDataType.getName.toLowerCase() match { + case "decimal" => + "decimal(" + column.getPrecision + "," + column.getScale + ")" + case others => + others + } + } + + /** + * The method register the carbon table with hive + * + * @param dbName + * @param tableName + * @param tableInfo + * @param sparkSession + * @return + */ + def registerTableWithHive(dbName: String, + tableName: String, + tableInfo: TableInfo)(sparkSession: SparkSession): Any = { + val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonSchemaString = carbonMetaStore.generateTableSchemaString(tableInfo, + tableInfo.getOrCreateAbsoluteTableIdentifier()) + val tablePath = tableInfo.getTablePath + try { + val columns = tableInfo.getFactTable.getListOfColumns.asScala.filter(!_.isInvisible) + // TODO adding the complex type fields should be handled + // getAll Fields + val fields = new Array[String](columns.size) + columns.map(column => fields(column.getSchemaOrdinal) + = column.getColumnName + ' ' + prepareDataType(column)) + val operationContext = new OperationContext + val createTablePreExecutionEvent: RefreshTablePreExecutionEvent = + new RefreshTablePreExecutionEvent(sparkSession, + tableInfo.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier, + tablePath) + OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext) + sparkSession.sql( --- End diff -- @ravipesala Please elaborate how to use the same i don't see any instance. --- |
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154276628 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{CreateTablePreExecutionEvent, OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = GetDB.getDatabaseName(dbName, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(databaseName, sparkSession, + CarbonProperties.getStorePath) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) + var locks = List.empty[ICarbonLock] + try { + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + locks = CommonUtil.acquireLock(locksToBeAcquired, absoluteTableIdentifier) + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + if (!sparkSession.sessionState.catalog.listTables(databaseName) + .exists(_.table.equalsIgnoreCase(tableName))) { + val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) --- End diff -- moved to CarbonMetaStore.getWrapperTableInfo(absoluteTableIdentifier) --- |
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154276652 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{CreateTablePreExecutionEvent, OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = GetDB.getDatabaseName(dbName, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(databaseName, sparkSession, + CarbonProperties.getStorePath) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1625/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1583 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2016/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1647/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1583 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2036/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1583#discussion_r154566404 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} + +/** + * Command to register carbon table from existing carbon table data + */ +case class RefreshCarbonTableCommand( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val databaseName = GetDB.getDatabaseName(dbName, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(databaseName, sparkSession, + CarbonProperties.getStorePath) + // Steps + // 1. get table path + // 2. perform the below steps + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + // 2.2 register the table with the hive check if the table being registered has aggregate table + // then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + // 2.2.2 Register the aggregate tables + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + // 2.1 check if the table already register with hive then ignore and continue with the next + // schema + if (!sparkSession.sessionState.catalog.listTables(databaseName) + .exists(_.table.equalsIgnoreCase(tableName))) { + val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) + // check the existence of the schema file to know its a carbon table + val schemaFilePath = carbonTablePath.getSchemaFilePath + // if schema file does not exist then the table will either non carbon table or stale + // carbon table + if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) { + // read TableInfo + val tableInfo = metaStore.getWrapperTableInfo(absoluteTableIdentifier)(sparkSession) + // 2.2 register the table with the hive check if the table being registered has + // aggregate table then do the below steps + // 2.2.1 validate that all the aggregate tables are copied at the store location. + val dataMapSchemaList = tableInfo.getDataMapSchemaList + if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) { + // validate all the aggregate tables are copied at the storeLocation + val allExists = validateAllAggregateTablePresent(databaseName, + dataMapSchemaList, databaseLocation) + if (!allExists) { + // fail the register operation + val msg = s"Table registration with Database name [$databaseName] and Table name " + + s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" + --- End diff -- Better print the agg table names which are not copied --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/413/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1583 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2070/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1685/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/418/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1583 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2076/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1690/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1583 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/439/ --- |
Free forum by Nabble | Edit this page |