akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378309605 ########## File path: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ########## @@ -133,8 +134,9 @@ public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper id BlockDataMap blockletDataMap = loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap, identifierWrapper.getCarbonTable(), - identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(), - identifierWrapper.getConfiguration(), indexInfos); + identifierWrapper.isAddToUnsafe(), + identifierWrapper.getConfiguration(), + identifierWrapper.isSerializeDmStore(), indexInfos); Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585260154 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/268/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585304077 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1971/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378768681 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -609,6 +613,137 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } + protected lazy val indexCommands: Parser[LogicalPlan] = + showIndexes | createIndexTable | dropIndexTable | registerIndexes | rebuildIndex + + protected lazy val createIndexTable: Parser[LogicalPlan] = + CREATE ~> INDEX ~> ident ~ (ON ~> TABLE ~> (ident <~ ".").? ~ ident) ~ + ("(" ~> repsep(ident, ",") <~ ")") ~ (AS ~> stringLit) ~ + (TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ { + case indexTableName ~ table ~ cols ~ indexStoreType ~ tblProp => + + if (!("carbondata".equalsIgnoreCase(indexStoreType) || + "org.apache.carbondata.format".equalsIgnoreCase(indexStoreType))) { Review comment: support only "carbondata" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378748901 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ########## @@ -61,7 +61,8 @@ public DiskBasedDMSchemaStorageProvider(String storePath) { this.storePath = CarbonUtil.checkAndAppendHDFSUrl(storePath); - this.mdtFilePath = storePath + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile"; + this.mdtFilePath = CarbonUtil.checkAndAppendHDFSUrl( Review comment: this.mdtFilePath = this.storePath + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile"; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378752394 ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -621,6 +624,52 @@ public boolean accept(CarbonFile file) { } } + /** + * this is the clean up added specifically for SI table, because after we merge the data files + * inside the secondary index table, we need to delete the stale carbondata files. + * refer {@link org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD} + */ + private static void cleanUpDataFilesAfterSmallFIlesMergeForSI(CarbonTable table, Review comment: cleanUpDataFilesAfterSmallFIlesMergeForSI => cleanUpDataFilesAfterSmallFilesMergeForSI ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378744023 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2341,4 +2347,72 @@ private CarbonCommonConstants() { * Default first day of week */ public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT = "SUNDAY"; + + @CarbonProperty + public static final String CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER = + "carbon.infilter.subquery.pushdown.enable"; + + + /** + * CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER_DEFAULT + */ + public static final String CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER_DEFAULT = "false"; + + /** + * key to get broadcast record size from properties + */ + @CarbonProperty + public static final String BROADCAST_RECORD_SIZE = "broadcast.record.size"; + + /** + * default broadcast record size + */ + public static final String DEFAULT_BROADCAST_RECORD_SIZE = "100"; + + /** + * to enable SI lookup partial string + */ + @CarbonProperty + public static final String ENABLE_SI_LOOKUP_PARTIALSTRING = "carbon.si.lookup.partialstring"; + + /** + * default value of ENABLE_SI_LOOKUP_PARTIALSTRING + */ + public static final String ENABLE_SI_LOOKUP_PARTIALSTRING_DEFAULT = "true"; + + /** + * configuration for launching the number of threads during secondary index creation + */ + @CarbonProperty + public static final String CARBON_SECONDARY_INDEX_CREATION_THREADS = + "carbon.secondary.index.creation.threads"; + + /** + * default value configuration for launching the number of threads during secondary + * index creation + */ + public static final String CARBON_SECONDARY_INDEX_CREATION_THREADS_DEFAULT = "1"; + + /** + * max value configuration for launching the number of threads during secondary + * index creation + */ + public static final int CARBON_SECONDARY_INDEX_CREATION_THREADS_MAX = 50; + + /** + * Enable SI segment Compaction / merge small files + */ + @CarbonProperty + public static final String CARBON_SI_SEGMENT_MERGE = "carbon.si.segment.merge"; + + /** + * Default value for SI segment Compaction / merge small files + * Making this true degrade the LOAD performance + * When the number of small files increase for SI segments(it can happen as number of columns will + * be less and we store position id and reference columns), user an either set to true which will + * merge the data files for upcoming loads or run SI rebuild command which does this job for all + * segments. (REBUILD INDEX <index_table>) + */ + public static final String DEFAULT_CARBON_SI_SEGMENT_MERGE = "false"; Review comment: DEFAULT_CARBON_SI_SEGMENT_MERGE => CARBON_SI_SEGMENT_MERGE_DEFAULT ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378771821 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala ########## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.secondaryindex.command + +import java.io.IOException +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import org.apache.log4j.Logger +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, CarbonRelation} +import org.apache.spark.sql.secondaryindex.exception.IndexTableExistException +import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil +import org.apache.spark.sql.secondaryindex.util.{CarbonInternalScalaUtil, IndexTableUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry, SchemaReader} +import org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, IndexTableInfo} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} + +class ErrorMessage(message: String) extends Exception(message) { +} + + /** + * Command for index table creation + * @param indexModel SecondaryIndex model holding the index infomation + * @param tableProperties SI table properties + * @param isCreateSIndex if false then will not create index table schema in the carbonstore + * and will avoid dataload for SI creation. + */ + private[sql] case class CreateIndexTable(indexModel: SecondaryIndex, + tableProperties: scala.collection.mutable.Map[String, String], + var isCreateSIndex: Boolean = true) + extends DataCommand { + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val databaseName = CarbonEnv.getDatabaseName(indexModel.databaseName)(sparkSession) + indexModel.databaseName = Some(databaseName) + val tableName = indexModel.tableName + val storePath = CarbonProperties.getStorePath + val dbLocation = CarbonEnv.getDatabaseLocation(databaseName, sparkSession) + val indexTableName = indexModel.indexTableName + + val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName + setAuditTable(databaseName, indexTableName) + setAuditInfo(Map("Column names" -> indexModel.columnNames.toString(), + "Parent TableName" -> indexModel.tableName, + "SI Table Properties" -> tableProperties.toString())) + LOGGER.info( + s"Creating Index with Database name [$databaseName] and Index name [$indexTableName]") + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val identifier = TableIdentifier(tableName, indexModel.databaseName) + var carbonTable: CarbonTable = null + var locks: List[ICarbonLock] = List() + var oldIndexInfo = "" + + try { + carbonTable = CarbonEnv.getCarbonTable(indexModel.databaseName, tableName)(sparkSession) + if (carbonTable == null) { + throw new ErrorMessage(s"Parent Table $databaseName.$tableName is not found") + } + + if (carbonTable != null && + (carbonTable.isFileLevelFormat || !carbonTable.getTableInfo.isTransactionalTable)) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") + } + + if (carbonTable.isStreamingSink) { + throw new ErrorMessage( + s"Parent Table ${ carbonTable.getDatabaseName }." + + s"${ carbonTable.getTableName }" + + s" is Streaming Table and Secondary index on Streaming table is not supported ") + } + + if (carbonTable.isHivePartitionTable) { + throw new ErrorMessage( + s"Parent Table ${ carbonTable.getDatabaseName }." + + s"${ carbonTable.getTableName }" + + s" is Partition Table and Secondary index on Partition table is not supported ") Review comment: is it easy to support it in the future? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378774558 ########## File path: secondary_index/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ########## @@ -0,0 +1,17 @@ +## ------------------------------------------------------------------------ +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## ------------------------------------------------------------------------ +org.apache.spark.sql.CarbonSource Review comment: already exists in integration/spark2 module ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378769971 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -609,6 +613,137 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } + protected lazy val indexCommands: Parser[LogicalPlan] = + showIndexes | createIndexTable | dropIndexTable | registerIndexes | rebuildIndex + + protected lazy val createIndexTable: Parser[LogicalPlan] = + CREATE ~> INDEX ~> ident ~ (ON ~> TABLE ~> (ident <~ ".").? ~ ident) ~ + ("(" ~> repsep(ident, ",") <~ ")") ~ (AS ~> stringLit) ~ + (TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ { + case indexTableName ~ table ~ cols ~ indexStoreType ~ tblProp => + + if (!("carbondata".equalsIgnoreCase(indexStoreType) || + "org.apache.carbondata.format".equalsIgnoreCase(indexStoreType))) { + sys.error("Not a carbon format request") + } + + val (dbName, tableName) = table match { + case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) + } + + val tableProperties = if (tblProp.isDefined) { + val tblProps = tblProp.get.map(f => f._1 -> f._2) + scala.collection.mutable.Map(tblProps: _*) + } else { + scala.collection.mutable.Map.empty[String, String] + } + // validate the tableBlockSize from table properties + CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE) + // validate for supported table properties + validateTableProperties(tableProperties) + // validate column_meta_cache proeperty if defined + val tableColumns: List[String] = cols.map(f => f.toLowerCase) + validateColumnMetaCacheAndCacheLevelProeprties(dbName, + indexTableName.toLowerCase, + tableColumns, + tableProperties) + validateColumnCompressorProperty(tableProperties + .getOrElse(CarbonCommonConstants.COMPRESSOR, null)) + val indexTableModel = SecondaryIndex(dbName, + tableName.toLowerCase, + tableColumns, + indexTableName.toLowerCase) + CreateIndexTable(indexTableModel, tableProperties) + } + + private def validateColumnMetaCacheAndCacheLevelProeprties(dbName: Option[String], + tableName: String, + tableColumns: Seq[String], + tableProperties: scala.collection.mutable.Map[String, String]): Unit = { + // validate column_meta_cache property + if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { + CommonUtil.validateColumnMetaCacheFields( + dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME), + tableName, + tableColumns, + tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get, + tableProperties) + } + // validate cache_level property + if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) { + CommonUtil.validateCacheLevel( + tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get, + tableProperties) + } + } + + private def validateColumnCompressorProperty(columnCompressor: String): Unit = { + // Add validatation for column compressor when creating index table + try { + if (null != columnCompressor) { + CompressorFactory.getInstance().getCompressor(columnCompressor) + } + } catch { + case ex: UnsupportedOperationException => + throw new InvalidConfigurationException(ex.getMessage) + } + } + + /** + * this method validates if index table properties contains other than supported ones + * + * @param tableProperties + */ + private def validateTableProperties(tableProperties: scala.collection.mutable.Map[String, + String]) = { + val supportedPropertiesForIndexTable = Seq("TABLE_BLOCKSIZE", + "COLUMN_META_CACHE", + "CACHE_LEVEL", + CarbonCommonConstants.COMPRESSOR.toUpperCase) + tableProperties.foreach { property => + if (!supportedPropertiesForIndexTable.contains(property._1.toUpperCase)) { + val errorMessage = "Unsupported Table property in index creation: " + property._1.toString + throw new MalformedCarbonCommandException(errorMessage) + } + } + } + + protected lazy val dropIndexTable: Parser[LogicalPlan] = + DROP ~> INDEX ~> opt(IF ~> EXISTS) ~ ident ~ (ON ~> (ident <~ ".").? ~ ident) <~ opt(";") ^^ { + case ifexist ~ indexTableName ~ table => + val (dbName, tableName) = table match { + case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) + } + DropIndexCommand(ifexist.isDefined, dbName, indexTableName.toLowerCase, tableName) + } + + protected lazy val showIndexes: Parser[LogicalPlan] = + (SHOW ~> opt(FORMATTED)) ~> (INDEXES | INDEX) ~> ON ~> ident ~ opt((FROM | IN) ~> ident) <~ Review comment: how about `(SHOW ~> INDEXES ~> ON ~> (ident <~ ".").? ~ ident` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378796482 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala ########## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.secondaryindex.command + +import java.io.IOException +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import org.apache.log4j.Logger +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, CarbonRelation} +import org.apache.spark.sql.secondaryindex.exception.IndexTableExistException +import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil +import org.apache.spark.sql.secondaryindex.util.{CarbonInternalScalaUtil, IndexTableUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry, SchemaReader} +import org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, IndexTableInfo} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} + +class ErrorMessage(message: String) extends Exception(message) { +} + + /** + * Command for index table creation + * @param indexModel SecondaryIndex model holding the index infomation + * @param tableProperties SI table properties + * @param isCreateSIndex if false then will not create index table schema in the carbonstore + * and will avoid dataload for SI creation. + */ + private[sql] case class CreateIndexTable(indexModel: SecondaryIndex, + tableProperties: scala.collection.mutable.Map[String, String], + var isCreateSIndex: Boolean = true) + extends DataCommand { + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val databaseName = CarbonEnv.getDatabaseName(indexModel.databaseName)(sparkSession) + indexModel.databaseName = Some(databaseName) + val tableName = indexModel.tableName + val storePath = CarbonProperties.getStorePath + val dbLocation = CarbonEnv.getDatabaseLocation(databaseName, sparkSession) + val indexTableName = indexModel.indexTableName + + val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName + setAuditTable(databaseName, indexTableName) + setAuditInfo(Map("Column names" -> indexModel.columnNames.toString(), + "Parent TableName" -> indexModel.tableName, + "SI Table Properties" -> tableProperties.toString())) + LOGGER.info( + s"Creating Index with Database name [$databaseName] and Index name [$indexTableName]") + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val identifier = TableIdentifier(tableName, indexModel.databaseName) + var carbonTable: CarbonTable = null + var locks: List[ICarbonLock] = List() + var oldIndexInfo = "" + + try { + carbonTable = CarbonEnv.getCarbonTable(indexModel.databaseName, tableName)(sparkSession) + if (carbonTable == null) { + throw new ErrorMessage(s"Parent Table $databaseName.$tableName is not found") + } + + if (carbonTable != null && + (carbonTable.isFileLevelFormat || !carbonTable.getTableInfo.isTransactionalTable)) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") + } + + if (carbonTable.isStreamingSink) { + throw new ErrorMessage( + s"Parent Table ${ carbonTable.getDatabaseName }." + + s"${ carbonTable.getTableName }" + + s" is Streaming Table and Secondary index on Streaming table is not supported ") + } + + if (carbonTable.isHivePartitionTable) { + throw new ErrorMessage( + s"Parent Table ${ carbonTable.getDatabaseName }." + + s"${ carbonTable.getTableName }" + + s" is Partition Table and Secondary index on Partition table is not supported ") Review comment: yes, there will some issues in pruning part, but we can support ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378796921 ########## File path: secondary_index/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ########## @@ -0,0 +1,17 @@ +## ------------------------------------------------------------------------ +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## ------------------------------------------------------------------------ +org.apache.spark.sql.CarbonSource Review comment: removed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811841 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2341,4 +2347,72 @@ private CarbonCommonConstants() { * Default first day of week */ public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT = "SUNDAY"; + + @CarbonProperty + public static final String CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER = + "carbon.infilter.subquery.pushdown.enable"; + + + /** + * CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER_DEFAULT + */ + public static final String CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER_DEFAULT = "false"; + + /** + * key to get broadcast record size from properties + */ + @CarbonProperty + public static final String BROADCAST_RECORD_SIZE = "broadcast.record.size"; + + /** + * default broadcast record size + */ + public static final String DEFAULT_BROADCAST_RECORD_SIZE = "100"; + + /** + * to enable SI lookup partial string + */ + @CarbonProperty + public static final String ENABLE_SI_LOOKUP_PARTIALSTRING = "carbon.si.lookup.partialstring"; + + /** + * default value of ENABLE_SI_LOOKUP_PARTIALSTRING + */ + public static final String ENABLE_SI_LOOKUP_PARTIALSTRING_DEFAULT = "true"; + + /** + * configuration for launching the number of threads during secondary index creation + */ + @CarbonProperty + public static final String CARBON_SECONDARY_INDEX_CREATION_THREADS = + "carbon.secondary.index.creation.threads"; + + /** + * default value configuration for launching the number of threads during secondary + * index creation + */ + public static final String CARBON_SECONDARY_INDEX_CREATION_THREADS_DEFAULT = "1"; + + /** + * max value configuration for launching the number of threads during secondary + * index creation + */ + public static final int CARBON_SECONDARY_INDEX_CREATION_THREADS_MAX = 50; + + /** + * Enable SI segment Compaction / merge small files + */ + @CarbonProperty + public static final String CARBON_SI_SEGMENT_MERGE = "carbon.si.segment.merge"; + + /** + * Default value for SI segment Compaction / merge small files + * Making this true degrade the LOAD performance + * When the number of small files increase for SI segments(it can happen as number of columns will + * be less and we store position id and reference columns), user an either set to true which will + * merge the data files for upcoming loads or run SI rebuild command which does this job for all + * segments. (REBUILD INDEX <index_table>) + */ + public static final String DEFAULT_CARBON_SI_SEGMENT_MERGE = "false"; Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811867 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ########## @@ -61,7 +61,8 @@ public DiskBasedDMSchemaStorageProvider(String storePath) { this.storePath = CarbonUtil.checkAndAppendHDFSUrl(storePath); - this.mdtFilePath = storePath + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile"; + this.mdtFilePath = CarbonUtil.checkAndAppendHDFSUrl( Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811891 ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -621,6 +624,52 @@ public boolean accept(CarbonFile file) { } } + /** + * this is the clean up added specifically for SI table, because after we merge the data files + * inside the secondary index table, we need to delete the stale carbondata files. + * refer {@link org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD} + */ + private static void cleanUpDataFilesAfterSmallFIlesMergeForSI(CarbonTable table, Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811905 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -609,6 +613,137 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } + protected lazy val indexCommands: Parser[LogicalPlan] = + showIndexes | createIndexTable | dropIndexTable | registerIndexes | rebuildIndex + + protected lazy val createIndexTable: Parser[LogicalPlan] = + CREATE ~> INDEX ~> ident ~ (ON ~> TABLE ~> (ident <~ ".").? ~ ident) ~ + ("(" ~> repsep(ident, ",") <~ ")") ~ (AS ~> stringLit) ~ + (TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ { + case indexTableName ~ table ~ cols ~ indexStoreType ~ tblProp => + + if (!("carbondata".equalsIgnoreCase(indexStoreType) || + "org.apache.carbondata.format".equalsIgnoreCase(indexStoreType))) { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811931 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -609,6 +613,137 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } + protected lazy val indexCommands: Parser[LogicalPlan] = + showIndexes | createIndexTable | dropIndexTable | registerIndexes | rebuildIndex + + protected lazy val createIndexTable: Parser[LogicalPlan] = + CREATE ~> INDEX ~> ident ~ (ON ~> TABLE ~> (ident <~ ".").? ~ ident) ~ + ("(" ~> repsep(ident, ",") <~ ")") ~ (AS ~> stringLit) ~ + (TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ { + case indexTableName ~ table ~ cols ~ indexStoreType ~ tblProp => + + if (!("carbondata".equalsIgnoreCase(indexStoreType) || + "org.apache.carbondata.format".equalsIgnoreCase(indexStoreType))) { + sys.error("Not a carbon format request") + } + + val (dbName, tableName) = table match { + case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) + } + + val tableProperties = if (tblProp.isDefined) { + val tblProps = tblProp.get.map(f => f._1 -> f._2) + scala.collection.mutable.Map(tblProps: _*) + } else { + scala.collection.mutable.Map.empty[String, String] + } + // validate the tableBlockSize from table properties + CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE) + // validate for supported table properties + validateTableProperties(tableProperties) + // validate column_meta_cache proeperty if defined + val tableColumns: List[String] = cols.map(f => f.toLowerCase) + validateColumnMetaCacheAndCacheLevelProeprties(dbName, + indexTableName.toLowerCase, + tableColumns, + tableProperties) + validateColumnCompressorProperty(tableProperties + .getOrElse(CarbonCommonConstants.COMPRESSOR, null)) + val indexTableModel = SecondaryIndex(dbName, + tableName.toLowerCase, + tableColumns, + indexTableName.toLowerCase) + CreateIndexTable(indexTableModel, tableProperties) + } + + private def validateColumnMetaCacheAndCacheLevelProeprties(dbName: Option[String], + tableName: String, + tableColumns: Seq[String], + tableProperties: scala.collection.mutable.Map[String, String]): Unit = { + // validate column_meta_cache property + if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { + CommonUtil.validateColumnMetaCacheFields( + dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME), + tableName, + tableColumns, + tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get, + tableProperties) + } + // validate cache_level property + if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) { + CommonUtil.validateCacheLevel( + tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get, + tableProperties) + } + } + + private def validateColumnCompressorProperty(columnCompressor: String): Unit = { + // Add validatation for column compressor when creating index table + try { + if (null != columnCompressor) { + CompressorFactory.getInstance().getCompressor(columnCompressor) + } + } catch { + case ex: UnsupportedOperationException => + throw new InvalidConfigurationException(ex.getMessage) + } + } + + /** + * this method validates if index table properties contains other than supported ones + * + * @param tableProperties + */ + private def validateTableProperties(tableProperties: scala.collection.mutable.Map[String, + String]) = { + val supportedPropertiesForIndexTable = Seq("TABLE_BLOCKSIZE", + "COLUMN_META_CACHE", + "CACHE_LEVEL", + CarbonCommonConstants.COMPRESSOR.toUpperCase) + tableProperties.foreach { property => + if (!supportedPropertiesForIndexTable.contains(property._1.toUpperCase)) { + val errorMessage = "Unsupported Table property in index creation: " + property._1.toString + throw new MalformedCarbonCommandException(errorMessage) + } + } + } + + protected lazy val dropIndexTable: Parser[LogicalPlan] = + DROP ~> INDEX ~> opt(IF ~> EXISTS) ~ ident ~ (ON ~> (ident <~ ".").? ~ ident) <~ opt(";") ^^ { + case ifexist ~ indexTableName ~ table => + val (dbName, tableName) = table match { + case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) + } + DropIndexCommand(ifexist.isDefined, dbName, indexTableName.toLowerCase, tableName) + } + + protected lazy val showIndexes: Parser[LogicalPlan] = + (SHOW ~> opt(FORMATTED)) ~> (INDEXES | INDEX) ~> ON ~> ident ~ opt((FROM | IN) ~> ident) <~ Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585720917 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/275/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585748604 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1978/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585768043 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/278/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |