[GitHub] [carbondata] akashrn5 commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance

GitBox

akashrn5 commented on a change in pull request #3701:
URL: https://github.com/apache/carbondata/pull/3701#discussion_r413567879



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -162,6 +165,49 @@ public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, Str
     }
   }
 
+  /**
+   * Get all index files list
+   *
+   * @param segmentFile
+   */
+  public List<CarbonFile> getAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile,

Review comment:
       please remove this method, it has duplicate code, please reuse `org.apache.carbondata.core.metadata.SegmentFileStore#getIndexCarbonFiles`

##########
File path: core/src/main/java/org/apache/carbondata/core/index/SegmentIndexMeta.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+public class SegmentIndexMeta {

Review comment:
       please add class level comment

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -289,6 +335,50 @@ public void readMergeFile(String mergeFilePath) throws IOException {
     }
   }
 
+  /**
+   * Read carbonindexmerge file and get data count
+   *
+   * @param mergeFilePath
+   * @throws IOException
+   */
+  public int readMergeIdxFileGetRowCount(String mergeFilePath) throws IOException {
+    ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration);
+    int rowCount = 0;
+    try {
+      thriftReader.open();
+      MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
+      MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
+      List<String> file_names = indexHeader.getFile_names();
+      List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+      assert (file_names.size() == fileData.size());
+      for (int i = 0; i < file_names.size(); i++) {
+        byte[] data = fileData.get(i).array();
+        DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(configuration);
+        List<DataFileFooter> fileFooters =
+                fileFooterConverter.getIndexInfo(file_names.get(i), data);
+        for (DataFileFooter footer : fileFooters) {
+          rowCount += footer.getNumberOfRows();
+        }
+      }
+      return rowCount;
+    } finally {
+      thriftReader.close();
+    }
+  }
+
+  public int readIdxFileGetRowCount(String indexFilePath) throws IOException {

Review comment:
       Same as above, please reuse `org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore#readIndexFile`. Please avoid code duplicaton

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -289,6 +335,50 @@ public void readMergeFile(String mergeFilePath) throws IOException {
     }
   }
 
+  /**
+   * Read carbonindexmerge file and get data count
+   *
+   * @param mergeFilePath
+   * @throws IOException
+   */
+  public int readMergeIdxFileGetRowCount(String mergeFilePath) throws IOException {

Review comment:
       remove this method and please reuse `org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore#readMergeFile` method and get info from there, if not direct, please i suggest to do proper refactor and get the row from there

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
##########
@@ -154,8 +155,32 @@ object CountStarPlan {
       return false
     }
     child collect {
-      case cd: Filter => return false
+      case filter: Filter => if (isPurePartitionPrune(child)) return true else return false
     }
     true
   }
+

Review comment:
       please add a comment,what exactly meaning pure partitions.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and cache index files list
+  // of each segment into memory. If its index files already exist in cache, not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.ArrayList[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => segment.getSegmentFile)
+      val tableSegmentIndexes = DataMapStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)

Review comment:
       @Zhangshunyu @QiangCai just for count(*) , again loading separate cache and using driver memory might impact other query time and memory usage for other queries in driver and count(*) is not more often query i feel.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and cache index files list
+  // of each segment into memory. If its index files already exist in cache, not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       instead of getting details and then filtering, please use `org.apache.carbondata.core.statusmanager.SegmentStatusManager.ValidAndInvalidSegmentsInfo#getValidSegments` and then can get the segmentFileNames.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and cache index files list
+  // of each segment into memory. If its index files already exist in cache, not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => segment.getSegmentFile)
+      val tableSegmentIndexes = IndexStoreManager.getInstance().getAllSegmentIndexes(

Review comment:
       How about the old stores, where we do not have segmentFiles? in that case its not handled. please handle

##########
File path: core/src/main/java/org/apache/carbondata/core/index/SegmentIndexMeta.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+public class SegmentIndexMeta {
+  private String segmentFilePath;
+  private final List<CarbonFile> segmentIndexFiles = new ArrayList<>();
+  private final Map<String, Long> prunedIndexFileToRowCountMap = new ConcurrentHashMap<>();
+
+  public SegmentIndexMeta(String segmentFilePath) {

Review comment:
       How are you handling in case of old segments where segment file wont be present?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and cache index files list
+  // of each segment into memory. If its index files already exist in cache, not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => segment.getSegmentFile)
+      val tableSegmentIndexes = IndexStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
+      val executorService = Executors.newFixedThreadPool(numThreads)
+      // to get the index files of valid segments from cache or scan through each segment
+      val futures = new java.util.ArrayList[Future[Long]]
+      for (segmentPath <- validSegmentPaths) {
+        futures.add(executorService.submit(new Callable[Long] {
+          override def call(): Long = {
+            var rowCountCurrentSeg: Long = 0
+            var indexFilesCurrentSeg: java.util.List[CarbonFile] = null;
+            // tableSegmentIndexes already init.
+            if (tableSegmentIndexes.get(segmentPath) != null &&
+              tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles.size() > 0) {
+              indexFilesCurrentSeg =
+                tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles
+            } else {
+              // read from seg file
+              val sfs = new SegmentFileStore(carbonTable.getTablePath, segmentPath)

Review comment:
       This will give NullPointerException if the segmentFileName is null for a segment or  line number 191, `indexFilesCurrentSeg` will get null

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setIndexJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and cache index files list
+  // of each segment into memory. If its index files already exist in cache, not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.HashSet[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => segment.getSegmentFile)
+      val tableSegmentIndexes = IndexStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
+      val executorService = Executors.newFixedThreadPool(numThreads)
+      // to get the index files of valid segments from cache or scan through each segment
+      val futures = new java.util.ArrayList[Future[Long]]
+      for (segmentPath <- validSegmentPaths) {
+        futures.add(executorService.submit(new Callable[Long] {
+          override def call(): Long = {
+            var rowCountCurrentSeg: Long = 0
+            var indexFilesCurrentSeg: java.util.List[CarbonFile] = null;
+            // tableSegmentIndexes already init.
+            if (tableSegmentIndexes.get(segmentPath) != null &&
+              tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles.size() > 0) {
+              indexFilesCurrentSeg =
+                tableSegmentIndexes.get(segmentPath).getSegmentIndexFiles
+            } else {
+              // read from seg file
+              val sfs = new SegmentFileStore(carbonTable.getTablePath, segmentPath)
+              val indexFileStore = new SegmentIndexFileStore
+              indexFilesCurrentSeg = indexFileStore.getAllIIndexOfSegment(sfs.getSegmentFile,
+                carbonTable.getTablePath, SegmentStatus.SUCCESS, false)
+              // tableSegmentIndexes already init and segmentIndexMeta not null.
+              val segmentIndexMeta = tableSegmentIndexes.get(segmentPath)
+              segmentIndexMeta.addSegmentIndexFiles(indexFilesCurrentSeg)
+              // cache all the index files of this segment
+              tableSegmentIndexes.put(segmentPath, segmentIndexMeta)
+            }
+
+            // use partition prune to get row count of each pruned index file and
+            // cache it for this segment
+            var prunedIndexFilesCurrentSeg: java.util.List[String] =
+            new java.util.ArrayList[String]()
+            for (indexFile <- indexFilesCurrentSeg.asScala) {
+              // check whether the index files located in pruned partition
+              val formattedPath = indexFile.getAbsolutePath
+                .replace("\\", "/")
+              if (prunedPartitionPaths.contains(
+                formattedPath.substring(0, formattedPath.lastIndexOf("/")))) {
+                prunedIndexFilesCurrentSeg.add(formattedPath)
+              }
+            }
+            // get the row count from cache or read it from pruned index files
+            var toReadIndexFiles: java.util.List[String] = new java.util.ArrayList[String]()
+            prunedIndexFilesCurrentSeg.asScala.foreach(prunedIndexFilePath =>
+              if (tableSegmentIndexes.get(segmentPath).getPrunedIndexFileToRowCountMap

Review comment:
       refactor and assign to  variable for `tableSegmentIndexes.get(segmentPath).getPrunedIndexFileToRowCountMap .get(prunedIndexFilePath)` and to update in `SegmentIndexMeta` add a setter for map

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
##########
@@ -531,7 +531,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index dm on table index_test_overwrite")
   }
 
-  test("explain query with lucene index") {
+  ignore("explain query with lucene index") {

Review comment:
       why this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]