QiangCai commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r410065804 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ########## @@ -108,4 +125,145 @@ case class CarbonCountStar( CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration) (job, carbonInputFormat) } + + // The detail of query flow as following for pure partition count star: + // Step 1. check whether it is pure partition count star by filter + // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid + // segment and expired segment + // Step 3. use multi-thread to read segment files which not in cache and cache index files list + // of each segment into memory. If its index files already exist in cache, not required to + // read again. + // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which + // can prune most index files and reduce the files num. + // Step 5. read the count from pruned index file directly and cache it, get from cache if exist + // in the index_file <-> rowCount map. + private def getRowCountPurePartitionPrune: Long = { + var rowCount: Long = 0 + val prunedPartitionPaths = new java.util.ArrayList[String]() + // Get the current partitions from table. + val partitions = CarbonFilters.getPrunedPartitions(relation, predicates) + if (partitions != null) { + for (partition <- partitions) { + prunedPartitionPaths.add(partition.getLocation.toString) + } + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val validSegmentPaths = details.filter(segment => + ((segment.getSegmentStatus == SegmentStatus.SUCCESS) || + (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)) + && segment.getSegmentFile != null).map(segment => segment.getSegmentFile) + val tableSegmentIndexes = DataMapStoreManager.getInstance().getAllSegmentIndexes( + carbonTable.getTableId) + if (!tableSegmentIndexes.isEmpty) { + // clear invalid cache + for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) { + if (!validSegmentPaths.contains(segmentFilePathInCache)) { + // means invalid cache + tableSegmentIndexes.remove(segmentFilePathInCache) + } + } + } + // init and put absent the valid cache + for (validSegmentPath <- validSegmentPaths) { + if (tableSegmentIndexes.get(validSegmentPath) == null) { + val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath) + tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta) + } + } + + val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4) Review comment: @ajantha-bhat it tries to load the required index into memory only and avoid to load all index into cache ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r410067294 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ########## @@ -108,4 +125,145 @@ case class CarbonCountStar( CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration) (job, carbonInputFormat) } + + // The detail of query flow as following for pure partition count star: + // Step 1. check whether it is pure partition count star by filter + // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid + // segment and expired segment + // Step 3. use multi-thread to read segment files which not in cache and cache index files list + // of each segment into memory. If its index files already exist in cache, not required to + // read again. + // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which + // can prune most index files and reduce the files num. + // Step 5. read the count from pruned index file directly and cache it, get from cache if exist + // in the index_file <-> rowCount map. + private def getRowCountPurePartitionPrune: Long = { + var rowCount: Long = 0 + val prunedPartitionPaths = new java.util.ArrayList[String]() + // Get the current partitions from table. + val partitions = CarbonFilters.getPrunedPartitions(relation, predicates) + if (partitions != null) { + for (partition <- partitions) { + prunedPartitionPaths.add(partition.getLocation.toString) + } + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val validSegmentPaths = details.filter(segment => + ((segment.getSegmentStatus == SegmentStatus.SUCCESS) || + (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)) + && segment.getSegmentFile != null).map(segment => segment.getSegmentFile) + val tableSegmentIndexes = DataMapStoreManager.getInstance().getAllSegmentIndexes( + carbonTable.getTableId) + if (!tableSegmentIndexes.isEmpty) { + // clear invalid cache + for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) { + if (!validSegmentPaths.contains(segmentFilePathInCache)) { + // means invalid cache + tableSegmentIndexes.remove(segmentFilePathInCache) + } + } + } + // init and put absent the valid cache + for (validSegmentPath <- validSegmentPaths) { + if (tableSegmentIndexes.get(validSegmentPath) == null) { + val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath) + tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta) + } + } + + val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4) Review comment: if the resource of the driver is not enough, we can run a jor instead of multi-thread. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r410067294 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ########## @@ -108,4 +125,145 @@ case class CarbonCountStar( CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration) (job, carbonInputFormat) } + + // The detail of query flow as following for pure partition count star: + // Step 1. check whether it is pure partition count star by filter + // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid + // segment and expired segment + // Step 3. use multi-thread to read segment files which not in cache and cache index files list + // of each segment into memory. If its index files already exist in cache, not required to + // read again. + // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which + // can prune most index files and reduce the files num. + // Step 5. read the count from pruned index file directly and cache it, get from cache if exist + // in the index_file <-> rowCount map. + private def getRowCountPurePartitionPrune: Long = { + var rowCount: Long = 0 + val prunedPartitionPaths = new java.util.ArrayList[String]() + // Get the current partitions from table. + val partitions = CarbonFilters.getPrunedPartitions(relation, predicates) + if (partitions != null) { + for (partition <- partitions) { + prunedPartitionPaths.add(partition.getLocation.toString) + } + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + val validSegmentPaths = details.filter(segment => + ((segment.getSegmentStatus == SegmentStatus.SUCCESS) || + (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)) + && segment.getSegmentFile != null).map(segment => segment.getSegmentFile) + val tableSegmentIndexes = DataMapStoreManager.getInstance().getAllSegmentIndexes( + carbonTable.getTableId) + if (!tableSegmentIndexes.isEmpty) { + // clear invalid cache + for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) { + if (!validSegmentPaths.contains(segmentFilePathInCache)) { + // means invalid cache + tableSegmentIndexes.remove(segmentFilePathInCache) + } + } + } + // init and put absent the valid cache + for (validSegmentPath <- validSegmentPaths) { + if (tableSegmentIndexes.get(validSegmentPath) == null) { + val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath) + tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta) + } + } + + val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4) Review comment: if the resource of the driver is not enough, we can run a job instead of multi-thread. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3701: [CARBONDATA-3770] Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#issuecomment-615806848 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2781/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3701: [CARBONDATA-3770] Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#issuecomment-615809517 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1068/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |