[GitHub] [carbondata] vikramahuja1001 opened a new pull request #3873: Repair SI Command

classic Classic list List threaded Threaded
84 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478314130



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // val databaseName = sparkSession.catalog.currentDatabase
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+    var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+      mainTblLoadMetadataDetails,
+      siTblLoadMetadataDetails)) {
+      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
+        indexTableName)
+      val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
+        indexMetadata.getParentTableName,
+        indexColumns.split(",").toList,
+        indexTableName)
+
+      var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+      // If it empty, then no need to do further computations because the
+      // tabletstatus might not have been created and hence next load will take care
+      if (details.isEmpty) {
+        Seq.empty
+      }
+
+      val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
+      .ArrayList[LoadMetadataDetails]()
+
+      // read the details of SI table and get all the failed segments during SI
+      // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
+      details.collect {

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // val databaseName = sparkSession.catalog.currentDatabase
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+    var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+      mainTblLoadMetadataDetails,
+      siTblLoadMetadataDetails)) {
+      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
+        indexTableName)
+      val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478314463



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String,
+                    indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val mainCarbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(mainCarbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
+
+    val indexMetadata = mainCarbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList
+          // SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
+            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        if (indexTableToRepair.isEmpty) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          indexTables.filter(indexTable => indexTable.equals(indexTableToRepair.get))
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+          if (indexTables.isEmpty) {
+            LOGGER.info("Unable to find index table" + indexTableToRepair.get)

Review comment:
       to check if the index table mentioned by user actually exists or not

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String,
+                    indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val mainCarbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(mainCarbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
+
+    val indexMetadata = mainCarbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList
+          // SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
+            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        if (indexTableToRepair.isEmpty) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          indexTables.filter(indexTable => indexTable.equals(indexTableToRepair.get))
+          indexTables.foreach {

Review comment:
       changed the logic

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String,
+                    indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String,

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478329089



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // val databaseName = sparkSession.catalog.currentDatabase
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       removed multiple readings, reading only once

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // val databaseName = sparkSession.catalog.currentDatabase
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+    var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+      mainTblLoadMetadataDetails,
+      siTblLoadMetadataDetails)) {
+      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
+        indexTableName)
+      val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
+        indexMetadata.getParentTableName,
+        indexColumns.split(",").toList,
+        indexTableName)
+
+      var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)

Review comment:
       removed, it was redundant code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-681940790


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3891/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-681942622


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2150/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-681994461


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-682062796


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3894/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-682063656


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2153/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478825910



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // val databaseName = sparkSession.catalog.currentDatabase
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       Why "readLoadMetadata" is till there for maintable?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478826107



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String,
+                    indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val mainCarbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(mainCarbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
+
+    val indexMetadata = mainCarbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList
+          // SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
+            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        if (indexTableToRepair.isEmpty) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          indexTables.filter(indexTable => indexTable.equals(indexTableToRepair.get))
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+          if (indexTables.isEmpty) {
+            LOGGER.info("Unable to find index table" + indexTableToRepair.get)

Review comment:
       why not throw an exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478831360



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String,
+                    indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val mainCarbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(mainCarbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
+
+    val indexMetadata = mainCarbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList
+          // SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
+            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        if (indexTableToRepair.isEmpty) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          indexTables.filter(indexTable => indexTable.equals(indexTableToRepair.get))
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+          if (indexTables.isEmpty) {
+            LOGGER.info("Unable to find index table" + indexTableToRepair.get)

Review comment:
       okay, changed to exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478837180



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -377,4 +381,212 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // val databaseName = sparkSession.catalog.currentDatabase
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       added it from the caller




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-682366851


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3901/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-682370365


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2160/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478884183



##########
File path: docs/index/secondary-index-guide.md
##########
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the main table

Review comment:
       ```suggestion
   Reindex on all the secondary Indexes of the main table
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager

Review comment:
       you havent taken the lock here,please check

##########
File path: docs/index/secondary-index-guide.md
##########
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the main table
+  ```
+  REINDEX ON TABLE [db_name.]main_table_name [WHERE SEGMENT.ID IN(0,1)]
+  ```
+Reindex on index table level

Review comment:
       ```suggestion
   Reindexing at index table level
   ```

##########
File path: docs/index/secondary-index-guide.md
##########
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the main table
+  ```
+  REINDEX ON TABLE [db_name.]main_table_name [WHERE SEGMENT.ID IN(0,1)]
+  ```
+Reindex on index table level
+
+  ```
+  REINDEX INDEX TABLE index_table ON [db_name.]main_table_name [WHERE SEGMENT.ID IN (1)]

Review comment:
       i think in this syntax, `REINDEX INDEX TABLE index_table`, the `TABLE` is not required. What you think @VenuReddy2103 @kunal642

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,

Review comment:
       correct the formatting here, refer the other case class definitions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r479959112



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier,

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r479959497



##########
File path: docs/index/secondary-index-guide.md
##########
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the main table
+  ```
+  REINDEX ON TABLE [db_name.]main_table_name [WHERE SEGMENT.ID IN(0,1)]
+  ```
+Reindex on index table level

Review comment:
       done

##########
File path: docs/index/secondary-index-guide.md
##########
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the main table

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r479960111



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-683681865


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2189/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3873: [CARBONDATA-3956] Reindex command on SI table

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#issuecomment-683685077


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3930/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12345