[GitHub] [carbondata] NamanRastogi commented on a change in pull request #3146: [WIP] Added PreAgg & Bloom Event-Listener for ShowCacheCommmand

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] NamanRastogi commented on a change in pull request #3146: [WIP] Added PreAgg & Bloom Event-Listener for ShowCacheCommmand

GitBox
NamanRastogi commented on a change in pull request #3146: [WIP]  Added PreAgg & Bloom Event-Listener for ShowCacheCommmand
URL: https://github.com/apache/carbondata/pull/3146#discussion_r267250425
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
 ##########
 @@ -61,241 +62,167 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
 
   override protected def opName: String = "SHOW CACHE"
 
-  def showAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
+  def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache()
     if (cache == null) {
       Seq(
-        Row("ALL", "ALL", bytesToDisplaySize(0L),
-          bytesToDisplaySize(0L), bytesToDisplaySize(0L)),
-        Row(currentDatabase, "ALL", bytesToDisplaySize(0L),
-          bytesToDisplaySize(0L), bytesToDisplaySize(0L)))
+        Row("ALL", "ALL", 0L, 0L, 0L),
+        Row(currentDatabase, "ALL", 0L, 0L, 0L))
     } else {
       val carbonTables = CarbonEnv.getInstance(sparkSession).carbonMetaStore
         .listAllTables(sparkSession)
-        .filter { table =>
-        table.getDatabaseName.equalsIgnoreCase(currentDatabase)
-      }
-      val tablePaths = carbonTables
-        .map { table =>
-          (table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR,
-            table.getDatabaseName + "." + table.getTableName)
-      }
-
-      val dictIds = carbonTables
-        .filter(_ != null)
-        .flatMap { table =>
-          table
-            .getAllDimensions
-            .asScala
-            .filter(_.isGlobalDictionaryEncoding)
-            .toArray
-            .map(dim => (dim.getColumnId, table.getDatabaseName + "." + table.getTableName))
+        .filter {
+          table =>
+            table.getDatabaseName.equalsIgnoreCase(currentDatabase)
+        }
+        .filter {
+          carbonTable =>
+            !carbonTable.isChildDataMap
         }
 
-      // all databases
-      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
-      // current database
+      // All tables of current database
       var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L)
-      val tableMapIndexSize = mutable.HashMap[String, Long]()
-      val tableMapDatamapSize = mutable.HashMap[String, Long]()
-      val tableMapDictSize = mutable.HashMap[String, Long]()
-      val cacheIterator = cache.getCacheMap.entrySet().iterator()
-      while (cacheIterator.hasNext) {
-        val entry = cacheIterator.next()
-        val cache = entry.getValue
-        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
-          // index
-          allIndexSize = allIndexSize + cache.getMemorySize
-          val indexPath = entry.getKey.replace(
-            CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
-          val tablePath = tablePaths.find(path => indexPath.startsWith(path._1))
-          if (tablePath.isDefined) {
-            dbIndexSize = dbIndexSize + cache.getMemorySize
-            val memorySize = tableMapIndexSize.get(tablePath.get._2)
-            if (memorySize.isEmpty) {
-              tableMapIndexSize.put(tablePath.get._2, cache.getMemorySize)
-            } else {
-              tableMapIndexSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
-            }
-          }
-        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
-          // bloom datamap
-          allDatamapSize = allDatamapSize + cache.getMemorySize
-          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          val tablePath = tablePaths.find(path => shardPath.contains(path._1))
-          if (tablePath.isDefined) {
-            dbDatamapSize = dbDatamapSize + cache.getMemorySize
-            val memorySize = tableMapDatamapSize.get(tablePath.get._2)
-            if (memorySize.isEmpty) {
-              tableMapDatamapSize.put(tablePath.get._2, cache.getMemorySize)
-            } else {
-              tableMapDatamapSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
-            }
+      val tableList: Seq[Row] = carbonTables.map {
+        carbonTable =>
+          val tableResult = getTableCache(sparkSession, carbonTable)
+          var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
+          tableResult.drop(2).foreach {
+            row =>
+              indexSize += row.getLong(1)
+              datamapSize += row.getLong(2)
           }
-        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
-          // dictionary
-          allDictSize = allDictSize + cache.getMemorySize
-          val dictId = dictIds.find(id => entry.getKey.startsWith(id._1))
-          if (dictId.isDefined) {
-            dbDictSize = dbDictSize + cache.getMemorySize
-            val memorySize = tableMapDictSize.get(dictId.get._2)
-            if (memorySize.isEmpty) {
-              tableMapDictSize.put(dictId.get._2, cache.getMemorySize)
-            } else {
-              tableMapDictSize.put(dictId.get._2, memorySize.get + cache.getMemorySize)
-            }
-          }
-        }
+          val dictSize = tableResult(1).getLong(1)
+
+          dbIndexSize += indexSize
+          dbDictSize += dictSize
+          dbDatamapSize += datamapSize
+
+          (carbonTable.getTableName, currentDatabase, indexSize, datamapSize, dictSize)
+      }.filter {
+        case (_, _, indexSize, datamapSize, dictSize) =>
+          !((indexSize == 0) && (datamapSize == 0) && (dictSize == 0))
+      }.map {
+        case (db, table, indexSize, datamapSize, dictSize) =>
+          Row(db, table, indexSize, datamapSize, dictSize)
       }
-      if (tableMapIndexSize.isEmpty && tableMapDatamapSize.isEmpty && tableMapDictSize.isEmpty) {
-        Seq(
-          Row("ALL", "ALL", bytesToDisplaySize(allIndexSize),
-            bytesToDisplaySize(allDatamapSize), bytesToDisplaySize(allDictSize)),
-          Row(currentDatabase, "ALL", bytesToDisplaySize(0),
-            bytesToDisplaySize(0), bytesToDisplaySize(0)))
-      } else {
-        val tableList = tableMapIndexSize
-          .map(_._1)
-          .toSeq
-          .union(tableMapDictSize.map(_._1).toSeq)
-          .distinct
-          .sorted
-          .map { uniqueName =>
-            val values = uniqueName.split("\\.")
-            val indexSize = tableMapIndexSize.getOrElse(uniqueName, 0L)
-            val datamapSize = tableMapDatamapSize.getOrElse(uniqueName, 0L)
-            val dictSize = tableMapDictSize.getOrElse(uniqueName, 0L)
-            Row(values(0), values(1), bytesToDisplaySize(indexSize),
-              bytesToDisplaySize(datamapSize), bytesToDisplaySize(dictSize))
+
+      // All Databases
+      val tablePaths: Set[String] = carbonTables.map {
+        table =>
+          table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
+      }.toSet
+      val dictIDs: List[String] = carbonTables.flatMap {
+        table =>
+          table.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding).map {
+            dim =>
+              dim.getColumnId
           }
+      }.toList
 
-        Seq(
-          Row("ALL", "ALL", bytesToDisplaySize(allIndexSize),
-            bytesToDisplaySize(allDatamapSize), bytesToDisplaySize(allDictSize)),
-          Row(currentDatabase, "ALL", bytesToDisplaySize(dbIndexSize),
-            bytesToDisplaySize(dbDatamapSize), bytesToDisplaySize(dbDictSize))
-        ) ++ tableList
+      var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+      cache.getCacheMap.asScala.foreach {
+        case (key, cacheable) =>
+          cacheable match {
+            case _: BlockletDataMapIndexWrapper =>
+              allIndexSize += cacheable.getMemorySize
+            case _: BloomCacheKeyValue.CacheValue =>
+              allDatamapSize += cacheable.getMemorySize
+            case _: AbstractColumnDictionaryInfo =>
+              allDictSize += cacheable.getMemorySize
+          }
       }
+
+      Seq(
+        Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize),
+        Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize)
+      ) ++ tableList
     }
   }
 
-  def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
-    val cache = CacheProvider.getInstance().getCarbonCache()
-    if (cache == null) {
-      Seq.empty
-    } else {
-      val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
-      var numIndexFilesCached = 0
-
-      // Path -> Name, Type
-      val datamapName = mutable.Map[String, (String, String)]()
-      // Path -> Size
-      val datamapSize = mutable.Map[String, Long]()
-      // parent table
-      datamapName.put(tablePath, ("", ""))
-      datamapSize.put(tablePath, 0)
-      // children tables
-      for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) {
-        val childTableName = carbonTable.getTableName + "_" + schema.getDataMapName
-        val childTable = CarbonEnv
-          .getCarbonTable(Some(carbonTable.getDatabaseName), childTableName)(sparkSession)
-        val path = childTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
-        val name = schema.getDataMapName
-        val dmType = schema.getProviderName
-        datamapName.put(path, (name, dmType))
-        datamapSize.put(path, 0)
-      }
-      // index schemas
-      for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-        .asScala) {
-        val path = tablePath + schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
-        val name = schema.getDataMapName
-        val dmType = schema.getProviderName
-        datamapName.put(path, (name, dmType))
-        datamapSize.put(path, 0)
-      }
-
-      var dictSize = 0L
-
-      // dictionary column ids
-      val dictIds = carbonTable
-        .getAllDimensions
-        .asScala
-        .filter(_.isGlobalDictionaryEncoding)
-        .map(_.getColumnId)
-        .toArray
-
-      val cacheIterator = cache.getCacheMap.entrySet().iterator()
-      while (cacheIterator.hasNext) {
-        val entry = cacheIterator.next()
-        val cache = entry.getValue
-
-        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
-          // index
-          val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          val pathEntry = datamapSize.filter(entry => indexPath.startsWith(entry._1))
-          if(pathEntry.nonEmpty) {
-            val (path, size) = pathEntry.iterator.next()
-            datamapSize.put(path, size + cache.getMemorySize)
-          }
-          if(indexPath.startsWith(tablePath)) {
-            numIndexFilesCached += 1
-          }
-        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
-          // bloom datamap
-          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
-            CarbonCommonConstants.FILE_SEPARATOR)
-          val pathEntry = datamapSize.filter(entry => shardPath.contains(entry._1))
-          if(pathEntry.nonEmpty) {
-            val (path, size) = pathEntry.iterator.next()
-            datamapSize.put(path, size + cache.getMemorySize)
-          }
-        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
-          // dictionary
-          val dictId = dictIds.find(id => entry.getKey.startsWith(id))
-          if (dictId.isDefined) {
-            dictSize = dictSize + cache.getMemorySize
-          }
-        }
+  def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
+    val cache = CacheProvider.getInstance().getCarbonCache
+    val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services