[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV

GitBox
kunal642 commented on a change in pull request #3245: [CARBONDATA-3398] Handled show cache for index server and MV
URL: https://github.com/apache/carbondata/pull/3245#discussion_r296178025
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##########
 @@ -62,216 +67,366 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
     }
   }
 
-  override protected def opName: String = "SHOW CACHE"
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    if (tableIdentifier.isEmpty) {
+      /**
+       * Assemble result for database
+       */
+      getAllTablesCache(sparkSession)
+    } else {
+      /**
+       * Assemble result for table
+       */
+      val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+      Checker
+        .validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession)
+      val numberOfIndexFiles = CacheUtil.getAllIndexFiles(carbonTable).size
+      val driverRawResults = getTableCacheFromDriver(sparkSession, carbonTable, numberOfIndexFiles)
+      val indexRawResults = if (CarbonProperties.getInstance().isDistributedPruningEnabled
+      (tableIdentifier.get.database.getOrElse(sparkSession.catalog.currentDatabase),
+        tableIdentifier.get.table)) {
+        getTableCacheFromIndexServer(carbonTable, numberOfIndexFiles)(sparkSession)
+      } else { Seq() }
+      val result = driverRawResults.slice(0, 2) ++
+                   driverRawResults.drop(2).map { row =>
+                     Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3))
+                   }
+      val serverResults = indexRawResults.slice(0, 2) ++
+                          indexRawResults.drop(2).map { row =>
+                            Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3))
+                          }
+      Seq(Row("DRIVER CACHE", "", "")) ++ result.map {
+        row =>
+          Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+      } ++ (serverResults match {
+        case Nil => Seq()
+        case list =>
+          Seq(Row("-----------", "-----------", "-----------"), Row("INDEX CACHE", "", "")) ++
+          list.map {
+          row => Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
+        }
+      })
+    }
+  }
 
   def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache
-    if (cache == null) {
-      Seq(
-        Row("ALL", "ALL", 0L, 0L, 0L),
-        Row(currentDatabase, "ALL", 0L, 0L, 0L))
-    } else {
-      var carbonTables = mutable.ArrayBuffer[CarbonTable]()
-      sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
-        tableIdent =>
+    val isDistributedPruningEnabled = CarbonProperties.getInstance()
+      .isDistributedPruningEnabled("", "")
+    if (cache == null && !isDistributedPruningEnabled) {
+      return makeEmptyCacheRows(currentDatabase)
+    }
+    var carbonTables = mutable.ArrayBuffer[CarbonTable]()
+    sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
+      tableIdent =>
+        try {
+          val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+          if (!carbonTable.isChildDataMap && !carbonTable.isChildTable) {
+            carbonTables += carbonTable
+          }
+        } catch {
+          case _: NoSuchTableException =>
+            LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+        }
+    }
+    val indexServerRows = if (isDistributedPruningEnabled) {
+      carbonTables.flatMap {
+        mainTable =>
           try {
-            val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-            if (!carbonTable.isChildDataMap) {
-              carbonTables += carbonTable
-            }
+            makeRows(getTableCacheFromIndexServer(mainTable)(sparkSession), mainTable)
           } catch {
-            case ex: NoSuchTableException =>
-              LOGGER.debug("Ignoring non-carbon table " + tableIdent.table)
+            case ex: UnsupportedOperationException => Seq()
           }
       }
+    } else { Seq() }
 
-      // All tables of current database
-      var (dbDatamapSize, dbDictSize) = (0L, 0L)
-      val tableList = carbonTables.flatMap {
+    val driverRows = if (cache != null) {
+      carbonTables.flatMap {
         carbonTable =>
           try {
-            val tableResult = getTableCache(sparkSession, carbonTable)
-            var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
-            tableResult.drop(2).foreach {
-              row =>
-                indexSize += row.getLong(1)
-                datamapSize += row.getLong(2)
-            }
-            val dictSize = tableResult(1).getLong(1)
-
-            dbDictSize += dictSize
-            dbDatamapSize += datamapSize
-
-            val tableName = if (!carbonTable.isTransactionalTable) {
-              carbonTable.getTableName + " (external table)"
-            }
-            else {
-              carbonTable.getTableName
-            }
-            Seq((currentDatabase, tableName, indexSize, datamapSize, dictSize))
+            makeRows(getTableCacheFromDriver(sparkSession, carbonTable), carbonTable)
           } catch {
-            case ex: UnsupportedOperationException =>
-              Seq.empty
+            case ex: UnsupportedOperationException => Seq()
           }
-      }.collect {
-        case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 0) &&
-                                                                (datamapSize == 0) &&
-                                                                (dictSize == 0)) =>
-          Row(db, table, indexSize, datamapSize, dictSize)
       }
+    } else { Seq() }
 
+    val (driverdbIndexSize, driverdbDatamapSize, driverdbDictSize) = calculateDBIndexAndDatamapSize(
+      driverRows)
+    val (indexdbIndexSize, indexdbDatamapSize, indexAllDictSize) = calculateDBIndexAndDatamapSize(
+      indexServerRows)
+    val (indexAllIndexSize, indexAllDatamapSize) = getIndexServerCacheSizeForCurrentDB
+
+    val driverDisplayRows = if (cache != null) {
       val tablePaths = carbonTables.map {
         carbonTable =>
           carbonTable.getTablePath
       }
-
-      // Scan whole cache and fill the entries for All-Database-All-Tables
-      // and Current-Database-All-Tables
-      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
-      var dbIndexSize = 0L
-      cache.getCacheMap.asScala.foreach {
-        case (key, cacheable) =>
-          cacheable match {
-            case _: BlockletDataMapIndexWrapper =>
-              allIndexSize += cacheable.getMemorySize
-              if (tablePaths.exists { path => key.startsWith(path) }) {
-                dbIndexSize += cacheable.getMemorySize
-              }
-            case _: BloomCacheKeyValue.CacheValue =>
-              allDatamapSize += cacheable.getMemorySize
-            case _: AbstractColumnDictionaryInfo =>
-              allDictSize += cacheable.getMemorySize
-          }
+      val (driverIndexSize, driverDatamapSize, allDictSize) = getAllDriverCacheSize(tablePaths
+        .toList)
+      if (driverRows.nonEmpty) {
+        val rows = (Seq(
+          Row("ALL", "ALL", driverIndexSize, driverDatamapSize, allDictSize),
+          Row(currentDatabase, "ALL", driverdbIndexSize, driverdbDatamapSize, driverdbDictSize)
+        ) ++ driverRows).collect {
+          case row if row.getLong(2) != 0L || row.getLong(3) != 0L || row.getLong(4) != 0L =>
+            Row(row(0), row(1), bytesToDisplaySize(row.getLong(2)),
+              bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4)))
+        }
+        Seq(Row("DRIVER CACHE", "", "", "", "")) ++ rows
+      } else {
+        makeEmptyCacheRows(currentDatabase)
       }
+    } else {
+      makeEmptyCacheRows(currentDatabase)
+    }
 
-      Seq(
-        Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize),
-        Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize)
-      ) ++ tableList
+    //      val (serverIndexSize, serverDataMapSize) = getAllIndexServerCacheSize
+    val indexDisplayRows = if (indexServerRows.nonEmpty) {
+      val rows = (Seq(
+        Row("ALL", "ALL", indexAllIndexSize, indexAllDatamapSize, indexAllDictSize),
+        Row(currentDatabase, "ALL", indexdbIndexSize, indexdbDatamapSize, driverdbDictSize)
+      ) ++ indexServerRows).collect {
+        case row if row.getLong(2) != 0L || row.getLong(3) != 0L || row.getLong(4) != 0L =>
+          Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
+            bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4)))
+      }
+      Seq(Row("INDEX SERVER CACHE", "", "", "", "")) ++ rows
+    } else {
+      Seq()
     }
+    driverDisplayRows ++ indexDisplayRows
   }
 
-  def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
+  def getTableCacheFromDriver(sparkSession: SparkSession, carbonTable: CarbonTable,
+      numOfIndexFiles: Int = 0): Seq[Row] = {
     val cache = CacheProvider.getInstance().getCarbonCache
-    val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
-    if (cache == null) {
-      var comments = 0 + "/" + allIndexFiles.size + " index files cached"
+    if (cache != null) {
+      val childTableList = getChildTableList(carbonTable)(sparkSession)
+      val (parentMetaCacheInfo, dataMapCacheInfo) = collectDriverMetaCacheInfo(carbonTable
+        .getTableUniqueName) match {
+        case list =>
+          val parentCache = list
+            .filter(_._4.equalsIgnoreCase(BlockletDataMapFactory.DATA_MAP_SCHEMA
+              .getProviderName)) match {
+            case Nil => ("", 0, 0L, "")
+            case head :: _ => head
+          }
+          val dataMapList = list
+            .filter(!_._4.equalsIgnoreCase(BlockletDataMapFactory.DATA_MAP_SCHEMA
+              .getProviderName))
+          (parentCache, dataMapList)
+        case Nil => (("", 0, 0L, ""), Nil)
+      }
+      val parentDictionary = getDictionarySize(carbonTable)(sparkSession)
+      val childMetaCacheInfos = childTableList.flatMap {
+        childTable =>
+          val tableArray = childTable._1.split("-")
+          val dbName = tableArray(0)
+          val tableName = tableArray(1)
+          val childMetaCacheInfo = collectDriverMetaCacheInfo(s"${dbName}_$tableName")
+          childMetaCacheInfo.map {
+            childMeta => Row(childMeta._1, childMeta._3, 0L, childTable._2)
+          }
+      } ++ (dataMapCacheInfo.map {
+        childMeta => Row(childMeta._1, childMeta._3, 0L, childMeta._4)
+      })
+      var comments = parentMetaCacheInfo._2 + s"/$numOfIndexFiles index files cached"
       if (!carbonTable.isTransactionalTable) {
         comments += " (external table)"
       }
-      return Seq(
-        Row("Index", 0L, comments),
+      Seq(
+        Row("Index", parentMetaCacheInfo._3, comments),
+        Row("Dictionary", parentDictionary, "")
+      ) ++ childMetaCacheInfos
+    } else {
+      Seq(
+        Row("Index", 0L, ""),
         Row("Dictionary", 0L, "")
       )
     }
+  }
 
-    val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, internalCall)
-    val operationContext = new OperationContext
-    // datamapName -> (datamapProviderName, indexSize, datamapSize)
-    val currentTableSizeMap = scala.collection.mutable.Map[String, (String, String, Long, Long)]()
-    operationContext.setProperty(carbonTable.getTableUniqueName, currentTableSizeMap)
-    OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, operationContext)
+  override protected def opName: String = "SHOW CACHE"
 
-    // Get all Index files for the specified table in cache
-    val (indexFilesLength, size) = if (CarbonProperties.getInstance()
-        .isDistributedPruningEnabled(carbonTable.getDatabaseName, carbonTable.getTableName)) {
-      getTableCache(carbonTable.getTableUniqueName)
-    } else {
-      val memorySizeForEachIndexFile: List[Long] = allIndexFiles.collect {
-        case indexFile if cache.get(indexFile) != null =>
-          cache.get(indexFile).getMemorySize
-      }
-      (memorySizeForEachIndexFile.length, memorySizeForEachIndexFile.sum)
+  private def makeEmptyCacheRows(currentDatabase: String) = {
+    Seq(
+      Row("ALL", "ALL", bytesToDisplaySize(0), bytesToDisplaySize(0), bytesToDisplaySize(0)),
+      Row(currentDatabase, "ALL", bytesToDisplaySize(0), bytesToDisplaySize(0),
+        bytesToDisplaySize(0)))
+  }
+
+  private def calculateDBIndexAndDatamapSize(rows: Seq[Row]): (Long, Long, Long) = {
+    rows.map {
+      row =>
+        (row(2).asInstanceOf[Long], row(3).asInstanceOf[Long], row.get(4).asInstanceOf[Long])
+    }.fold((0L, 0L, 0L)) {
+      case (a, b) =>
+        (a._1 + b._1, a._2 + b._2, a._3 + b._3)
     }
+  }
 
-    // Extract dictionary keys for the table and create cache keys from those
-    val dictKeys = CacheUtil.getAllDictCacheKeys(carbonTable)
-    val sizeOfDictInCache = dictKeys.collect {
-      case dictKey if cache.get(dictKey) != null =>
-        cache.get(dictKey).getMemorySize
-    }.sum
+  private def makeRows(tableResult: Seq[Row], carbonTable: CarbonTable) = {
+    var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
+    tableResult.drop(2).foreach {
+      row =>
+        indexSize += row.getLong(1)
+        datamapSize += row.getLong(2)
+    }
+    val dictSize = tableResult(1).getLong(1)
+    Seq(Row(carbonTable.getDatabaseName, carbonTable.getTableName,
+      indexSize,
+      datamapSize,
+      dictSize))
+  }
 
-    // Assemble result for all the datamaps for the table
-    val otherDatamaps = operationContext.getProperty(carbonTable.getTableUniqueName)
-      .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
-    val otherDatamapsResults: Seq[Row] = otherDatamaps.map {
-      case (name, (provider, indexSize, dmSize)) =>
-        Row(name, indexSize, dmSize, provider)
-    }.toSeq
-    var comments = indexFilesLength + "/" + allIndexFiles.size + " index files cached"
-    if (!carbonTable.isTransactionalTable) {
+  private def getTableCacheFromIndexServer(mainTable: CarbonTable, numberOfIndexFiles: Int = 0)
+    (sparkSession: SparkSession): Seq[Row] = {
+    val childTables = getChildTableList(mainTable)(sparkSession)
+    val cache = if (tableIdentifier.nonEmpty) {
+      executeJobToGetCache(childTables.map(_._1) ++ List(mainTable.getTableUniqueName))
+    } else {
+      cacheResult
+    }
+    val (mainTableFiles, mainTableCache) = getTableCache(cache, mainTable.getTableUniqueName)
+    val childMetaCacheInfos = childTables.flatMap {
+      childTable =>
+        val tableName = childTable._1.replace("-", "_")
 
 Review comment:
   because the table unique names are in the format dbName-tableName to ease the parsing for dbName and tableName.
   
   If i use _ and dbName and tableName also have _ in their names then the split will become difficult.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services