vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476263117 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) Review comment: removed this variable ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476264599 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476265007 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) Review comment: it is required to get SI in processSIrepair method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476265118 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false Review comment: added more code after that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476265118 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false Review comment: removed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476267360 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false + indexTables.foreach { + indexTableName => + if (indexTableName == indexName.get) { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476267430 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) + } else { + // for all tables in the db + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, + indexName: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(carbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) + + val indexMetadata = carbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]() + segments.get.foreach(segmentNo => { + tempLoadMetaData.foreach(metadata => + if (metadata.getLoadName.equals(segmentNo)) { + listLoadMetaDataDetails.add(metadata) + }) + }) + listLoadMetaDataDetails.asScala.toArray + } + if (allIndex) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + var isPresent : Boolean = false + indexTables.foreach { + indexTableName => + if (indexTableName == indexName.get) { + isPresent = true + CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r476282119 ########## File path: docs/index/secondary-index-guide.md ########## @@ -188,4 +188,13 @@ where we have old stores. Syntax ``` REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name + ``` + +### Repair index Command +This command is used to reload segments in the SI table in case when there is some mismatch in the number +of segments in main table and the SI table Review comment: done ########## File path: docs/index/secondary-index-guide.md ########## @@ -188,4 +188,13 @@ where we have old stores. Syntax ``` REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name + ``` + +### Repair index Command +This command is used to reload segments in the SI table in case when there is some mismatch in the number +of segments in main table and the SI table + +Syntax + ``` + REPAIR INDEX ON TABLE maintable_name Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-679951490 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2118/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#issuecomment-679951854 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3859/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477342379 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Repair logic for reindex command on maintable/indextable + */ +case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // dbName is null, repair for index table or all the index table in main table + val databaseName = if (tableIdentifier.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase Review comment: please use the sparkSession which is passed as parameter to this method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477343137 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Repair logic for reindex command on maintable/indextable + */ +case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // dbName is null, repair for index table or all the index table in main table + val databaseName = if (tableIdentifier.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableIdentifier.database.get + } + triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments) + } else { + // repairing si for all index tables in the mentioned database in the repair command + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexnameOp, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, Review comment: change "tableNameOp" to "tableName" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477343487 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Repair logic for reindex command on maintable/indextable + */ +case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // dbName is null, repair for index table or all the index table in main table + val databaseName = if (tableIdentifier.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableIdentifier.database.get + } + triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments) + } else { + // repairing si for all index tables in the mentioned database in the repair command + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexnameOp, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, + indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get Review comment: pass the sparksession from the caller ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477347873 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Repair logic for reindex command on maintable/indextable + */ +case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // dbName is null, repair for index table or all the index table in main table + val databaseName = if (tableIdentifier.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableIdentifier.database.get + } + triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments) + } else { + // repairing si for all index tables in the mentioned database in the repair command + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexnameOp, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, + indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val mainCarbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(mainCarbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(mainCarbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable)) + + val indexMetadata = mainCarbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + carbonLoadModel.getLoadMetadataDetails.asScala.toList + // SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter( + loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName)) + } + if (indexTableToRepair.isEmpty) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + indexTables.filter(indexTable => indexTable.equals(indexTableToRepair.get)) + indexTables.foreach { Review comment: Please recheck this logic.. After filtering the indexTableToRepair from the index tables, you are not using it to repair..Instead all the indexTables are considered for repair ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477348313 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Repair logic for reindex command on maintable/indextable + */ +case class IndexRepairCommand(indexnameOp: Option[String], tableIdentifier: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { + if (dbName == null) { + // dbName is null, repair for index table or all the index table in main table + val databaseName = if (tableIdentifier.database.isEmpty) { + SparkSession.getActiveSession.get.catalog.currentDatabase + } else { + tableIdentifier.database.get + } + triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments) + } else { + // repairing si for all index tables in the mentioned database in the repair command + sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => + triggerRepair(tableIdent.table, dbName, indexnameOp, segments) + } + } + Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, + indexTableToRepair: Option[String], segments: Option[List[String]]): Unit = { + val sparkSession = SparkSession.getActiveSession.get + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val mainCarbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + + val carbonLoadModel = new CarbonLoadModel + carbonLoadModel.setDatabaseName(databaseName) + carbonLoadModel.setTableName(tableNameOp) + carbonLoadModel.setTablePath(mainCarbonTable.getTablePath) + val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(mainCarbonTable.getTablePath) + carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).toList.asJava) + carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable)) + + val indexMetadata = mainCarbonTable.getIndexMetadata + val secondaryIndexProvider = IndexType.SI.getIndexProviderName + if (null != indexMetadata && null != indexMetadata.getIndexesMap && + null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) { + val indexTables = indexMetadata.getIndexesMap + .get(secondaryIndexProvider).keySet().asScala + // if there are no index tables for a given fact table do not perform any action + if (indexTables.nonEmpty) { + val mainTableDetails = if (segments.isEmpty) { + carbonLoadModel.getLoadMetadataDetails.asScala.toList + // SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + } else { + // get segments for main table + carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter( + loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName)) + } + if (indexTableToRepair.isEmpty) { + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + } else { + indexTables.filter(indexTable => indexTable.equals(indexTableToRepair.get)) + indexTables.foreach { + indexTableName => + CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel, + indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession) + } + if (indexTables.isEmpty) { + LOGGER.info("Unable to find index table" + indexTableToRepair.get) Review comment: why is this check needed? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477348891 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -377,4 +381,212 @@ object CarbonIndexUtil { AlterTableUtil.releaseLocks(locks.asScala.toList) } } + + def processSIRepair(indexTableName: String, carbonTable: CarbonTable, + carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata, + mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String) + (sparkSession: SparkSession) : Unit = { + val sparkSession = SparkSession.getActiveSession.get Review comment: please use the sparkSession which is passed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477350602 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -377,4 +381,212 @@ object CarbonIndexUtil { AlterTableUtil.releaseLocks(locks.asScala.toList) } } + + def processSIRepair(indexTableName: String, carbonTable: CarbonTable, + carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata, + mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String) + (sparkSession: SparkSession) : Unit = { + val sparkSession = SparkSession.getActiveSession.get + // val databaseName = sparkSession.catalog.currentDatabase + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val indexTable = metaStore + .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)( + sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + + val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) Review comment: consider passing the loadMetadata details from the caller to avoid multiple reading. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477351380 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -377,4 +381,212 @@ object CarbonIndexUtil { AlterTableUtil.releaseLocks(locks.asScala.toList) } } + + def processSIRepair(indexTableName: String, carbonTable: CarbonTable, + carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata, + mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String) + (sparkSession: SparkSession) : Unit = { + val sparkSession = SparkSession.getActiveSession.get + // val databaseName = sparkSession.catalog.currentDatabase + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val indexTable = metaStore + .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)( + sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + + val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val siTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) + var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty + if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg( + mainTblLoadMetadataDetails, + siTblLoadMetadataDetails)) { + val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider, + indexTableName) + val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName), Review comment: change variable name to indexModel ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477351673 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -377,4 +381,212 @@ object CarbonIndexUtil { AlterTableUtil.releaseLocks(locks.asScala.toList) } } + + def processSIRepair(indexTableName: String, carbonTable: CarbonTable, + carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata, + mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String) + (sparkSession: SparkSession) : Unit = { + val sparkSession = SparkSession.getActiveSession.get + // val databaseName = sparkSession.catalog.currentDatabase + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val indexTable = metaStore + .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)( + sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + + val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val siTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) + var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty + if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg( + mainTblLoadMetadataDetails, + siTblLoadMetadataDetails)) { + val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider, + indexTableName) + val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName), + indexMetadata.getParentTableName, + indexColumns.split(",").toList, + indexTableName) + + var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) Review comment: Why 2nd time read is required for SI? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r477352862 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -377,4 +381,212 @@ object CarbonIndexUtil { AlterTableUtil.releaseLocks(locks.asScala.toList) } } + + def processSIRepair(indexTableName: String, carbonTable: CarbonTable, + carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata, + mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String) + (sparkSession: SparkSession) : Unit = { + val sparkSession = SparkSession.getActiveSession.get + // val databaseName = sparkSession.catalog.currentDatabase + // when Si creation and load to main table are parallel, get the carbonTable from the + // metastore which will have the latest index Info + val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + val indexTable = metaStore + .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)( + sparkSession) + .asInstanceOf[CarbonRelation] + .carbonTable + + val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val siTblLoadMetadataDetails: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) + var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty + if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg( + mainTblLoadMetadataDetails, + siTblLoadMetadataDetails)) { + val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider, + indexTableName) + val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName), + indexMetadata.getParentTableName, + indexColumns.split(",").toList, + indexTableName) + + var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) + // If it empty, then no need to do further computations because the + // tabletstatus might not have been created and hence next load will take care + if (details.isEmpty) { + Seq.empty + } + + val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util + .ArrayList[LoadMetadataDetails]() + + // read the details of SI table and get all the failed segments during SI + // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS + details.collect { Review comment: change to foreach ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |