CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676194037 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2047/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676196273 @VenuReddy2103 @akashrn5 @QiangCai , please review this PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676458977 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
brijoobopanna commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676461075 @vikramahuja1001 please complete all implementations and once WIP is removed we can start the review ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676462137 okay ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676473431 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2050/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676474301 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3792/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676484171 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676491522 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3796/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676494128 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2054/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-676589566 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r474030501 ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table + */ +class TestIndexRepair extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("drop table if exists maintable") + sql("drop table if exists indextable1") + sql("drop table if exists indextable2") + } + + test("reindex command after deleting segments from SI table") { + sql("drop table if exists maintable") + sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)") + sql("CLEAN FILES FOR TABLE INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE") + val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments == postRepairSegments) Review comment: you should also consider, adding a query and check if its hitting SI after reindex. Please check for all test cases ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) Review comment: why we need both the variable,` indexname.isEmpty, indexname`, you can just sent indexName and then check whether defined or not and tale decisions in the function implementation. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level Review comment: comment is not clear ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) Review comment: ```suggestion triggerIndexRepair(tableNameOp.table, databaseName, indexname.isDefined, indexname, segments) ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager Review comment: before reading the table status file, get a lock and read to avoid concurrent issues ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, Review comment: move all the variables to next line, correct the formatting. Refer other case classes. ########## File path: docs/index/secondary-index-guide.md ########## @@ -188,4 +188,13 @@ where we have old stores. Syntax ``` REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name + ``` + +### Repair index Command +This command is used to reload segments in the SI table in case when there is some mismatch in the number +of segments in main table and the SI table Review comment: ```suggestion of segments with main table. ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table Review comment: comment is not proper, please correct it according to functionality ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table Review comment: test cases for testing repair index table right? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) Review comment: rename this to proper name ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table + */ +class TestIndexRepair extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("drop table if exists maintable") + sql("drop table if exists indextable1") + sql("drop table if exists indextable2") + } + + test("reindex command after deleting segments from SI table") { + sql("drop table if exists maintable") + sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)") + sql("CLEAN FILES FOR TABLE INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE") + val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments == postRepairSegments) + sql("drop table if exists maintable") + } + + + test("reindex command after deleting segments from SI table on other database without use") { + sql("drop table if exists test.maintable") + sql("drop database if exists test cascade") + sql("create database test") + sql("CREATE TABLE test.maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table test.maintable(c) as 'carbondata'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count() + sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)") + sql("CLEAN FILES FOR TABLE test.INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON test.MAINTABLE") + val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count() + assert(preDeleteSegments == postRepairSegments) + sql("drop table if exists test.maintable") + sql("drop database if exists test cascade") + } + + test("reindex command using segment.id after deleting segments from SI table") { + sql("drop table if exists maintable") + sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)") + sql("CLEAN FILES FOR TABLE INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (0,1)") + val postFirstRepair = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(postDeleteSegments + 2 == postFirstRepair) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (2)") Review comment: add a query before reindex and it should not hit SI table ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, Review comment: ```suggestion case class IndexRepairCommand(indexNameOp: Option[String], tableIdentifier: TableIdentifier, ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ Review comment: ```suggestion segments: Option[List[String]]) extends DataCommand { ``` correct the formatting ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { Review comment: `tableNameOp.database` gives an `Option ` right, you can use `isDefined` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString Review comment: its a redundant operation to do `toString` on an already string ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db Review comment: rewrite the comment in more meaningful way ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, Review comment: ```suggestion CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel, ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false Review comment: why this boolean required? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { Review comment: ```suggestion val mainTableDetails = if (segments.isDefined) { ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false + indexTables.foreach { + indexTableName => + if (indexTableName == indexName.get) { Review comment: you can rename `indexName` to more meaningful like, `indexTableToRepair` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false + indexTables.foreach { + indexTableName => + if (indexTableName == indexName.get) { + isPresent = true + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } Review comment: ``` indexTables.foreach { indexTableName => if (indexTableName == indexName.get) { isPresent = true CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) } } ``` remove this code, use in a simple way `indexTables.filter(indexTable => indexTable.equals(indexName.get))` theh pass the output to the processRepair method ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) Review comment: no need to send `secondaryIndexProvider` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() Review comment: remove this temp list, use scala itself ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) Review comment: we are already reading the table status file at line number 80, no need to read again and again, avoid reading at line 92 and 95 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } Review comment: ``` segments.get.foreach(segmentNo => { tempLoadMetaData.foreach(metadata => if (metadata.getLoadName.equals(segmentNo)) { listLoadMetaDataDetails.add(metadata) }) }) listLoadMetaDataDetails.asScala.toArray ``` remove this code, use scala filter like below `tempLoadMetadata.filter(loadMetadataDetails => segments.get.contains(loadMetadatDetails.getLoadName))` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476196350 ########## File path: docs/index/secondary-index-guide.md ########## @@ -188,4 +188,13 @@ where we have old stores. Syntax ``` REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name + ``` + +### Repair index Command +This command is used to reload segments in the SI table in case when there is some mismatch in the number +of segments in main table and the SI table + +Syntax + ``` + REPAIR INDEX ON TABLE maintable_name Review comment: Looks like index_name is missing from syntax. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476215511 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { Review comment: Intellij prompts to replace it with .isEmpty ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level Review comment: changed ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476216833 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table Review comment: chnaged ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table + */ +class TestIndexRepair extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("drop table if exists maintable") + sql("drop table if exists indextable1") + sql("drop table if exists indextable2") + } + + test("reindex command after deleting segments from SI table") { + sql("drop table if exists maintable") + sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)") + sql("CLEAN FILES FOR TABLE INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE") + val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments == postRepairSegments) + sql("drop table if exists maintable") + } + + + test("reindex command after deleting segments from SI table on other database without use") { + sql("drop table if exists test.maintable") + sql("drop database if exists test cascade") + sql("create database test") + sql("CREATE TABLE test.maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table test.maintable(c) as 'carbondata'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count() + sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)") + sql("CLEAN FILES FOR TABLE test.INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON test.MAINTABLE") + val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count() + assert(preDeleteSegments == postRepairSegments) + sql("drop table if exists test.maintable") + sql("drop database if exists test cascade") + } + + test("reindex command using segment.id after deleting segments from SI table") { + sql("drop table if exists maintable") + sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)") + sql("CLEAN FILES FOR TABLE INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (0,1)") + val postFirstRepair = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(postDeleteSegments + 2 == postFirstRepair) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (2)") Review comment: changed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476217005 ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table + */ +class TestIndexRepair extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("drop table if exists maintable") + sql("drop table if exists indextable1") + sql("drop table if exists indextable2") + } + + test("reindex command after deleting segments from SI table") { + sql("drop table if exists maintable") + sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") + val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)") + sql("CLEAN FILES FOR TABLE INDEXTABLE1") + val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments!=postDeleteSegments) + sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE") + val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() + assert(preDeleteSegments == postRepairSegments) Review comment: added ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table Review comment: changed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476218914 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476219675 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476220269 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { Review comment: intelliJ prompts to use isEmpty ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476235938 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |