Posted by
GitBox on
Nov 03, 2020; 1:41pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-ShreelekhyaG-opened-a-new-pull-request-3988-WIP-Clean-index-files-when-clean-filesd-tp102150p103058.html
ShreelekhyaG commented on a change in pull request #3988:
URL:
https://github.com/apache/carbondata/pull/3988#discussion_r515750468##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
return carbonFile.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
- return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
- .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
+ return (!oldIndexAndMergeIndexFiles.contains(file.getAbsolutePath()) && (
+ file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+ .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
}
});
}
+ public static List<String> getOldIndexAndMergeIndexFiles() {
+ return oldIndexAndMergeIndexFiles;
+ }
+
+ public static void setOldIndexAndMergeIndexFiles(List<String> oldIndexAndMergeIndexFiles) {
Review comment:
While writing merge index file /segment file, it gets index files from segment directory. And during SI small files merge step, we will have `old.index`, `new.index` files and only `new.index` is valid for merge step and writing segment file. For maintable also we could store and use like this for normal load but in few cases like add segment/reading from external location, we will have to read mergeindex file to identify invalid index files. And for clean files, I'm getting all index files from the segment directory and eliminate which are not present in segment file.
##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
return carbonFile.listFiles(new CarbonFileFilter() {
Review comment:
ok removed
##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
if (null == index) {
Review comment:
done
##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
if (null == index) {
index = new LinkedList<>();
}
+ CarbonFile[] carbonIndexFiles =
+ index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);
+ List<String> mergedIndexFiles =
+ SegmentFileStore.getInvalidAndMergedIndexFiles(carbonIndexFiles);
for (String indexPath : index) {
Review comment:
done
##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
@Override
public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
Map<String, String> indexFiles;
- if (segment.getSegmentFileName() == null) {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ if (fileStore.getSegmentFile() == null) {
String path =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
} else {
- SegmentFileStore fileStore =
- new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
indexFiles = fileStore.getIndexOrMergeFiles();
if (fileStore.getSegmentFile() != null) {
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
}
}
+ List<String> index = new ArrayList<>(indexFiles.keySet());
+ CarbonFile[] carbonIndexFiles =
+ index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);
Review comment:
done. For `getInvalidAndMergedIndexFiles`, it takes `List<String>` as argument and returns `List<String>`.
##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
indexFilesInPartition.add(indexCarbonFile);
}
}
- indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+ indexFiles = indexFilesInPartition;
} else {
- indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+ indexFiles = indexCarbonFiles;
}
+ }
+ if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+ return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+ readFileFooterFromCarbonDataFile, segmentPath,
+ indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);
} else {
- indexFiles =
+ CarbonFile[] indexFilesFromFile =
Review comment:
done
##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
indexFilesInPartition.add(indexCarbonFile);
}
}
- indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+ indexFiles = indexFilesInPartition;
} else {
- indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+ indexFiles = indexCarbonFiles;
}
+ }
+ if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+ return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+ readFileFooterFromCarbonDataFile, segmentPath,
+ indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);
Review comment:
done
##########
File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -188,8 +188,22 @@ class PrestoInsertIntoTableTestCase
val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb",
"testtable")
val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
- val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
- assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8)
+ val segment0 = CarbonTablePath
+ .getSegmentPath(carbonTable.getAbsoluteTableIdentifier.getTablePath, "0")
Review comment:
done
##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
/**
* This method deletes the old index files or merge index file after data files merge
*/
- private def deleteOldIndexOrMergeIndexFiles(
+ private def collectOldIndexOrMergeIndexFiles(
factTimeStamp: Long,
validSegments: util.List[Segment],
indexCarbonTable: CarbonTable): Unit = {
+ val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
// delete the index/merge index carbonFile of old data files
Review comment:
done
##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
/**
* This method deletes the old index files or merge index file after data files merge
*/
- private def deleteOldIndexOrMergeIndexFiles(
+ private def collectOldIndexOrMergeIndexFiles(
factTimeStamp: Long,
validSegments: util.List[Segment],
indexCarbonTable: CarbonTable): Unit = {
+ val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
// delete the index/merge index carbonFile of old data files
validSegments.asScala.foreach { segment =>
SegmentFileStore.getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
.asScala
.foreach { indexFile =>
if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < factTimeStamp) {
- FileFactory.getCarbonFile(indexFile).delete()
+ oldIndexAndMergeIndexFiles.add(indexFile)
}
}
}
+ SegmentIndexFileStore.setOldIndexAndMergeIndexFiles(oldIndexAndMergeIndexFiles)
Review comment:
modified one existing test case and added one to check data file merge without enabling index merge property.
##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
}
}
+ /**
+ * Clean invalid and expired index files of carbon table.
+ *
+ * @param carbonTable CarbonTable
+ */
+ def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+ val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+ .getValidAndInvalidSegments.getValidSegments.asScala.toList
Review comment:
`deleteLoadsAndUpdateMetadata` reads loadmetadata and is called from multiple places. so I think it's better to read inside method.
##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
var tableStatusUpdateForFailure = false
if (successSISegments.nonEmpty && !isCompactionCall) {
- tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
- successSISegments,
- secondaryIndexModel.carbonLoadModel.getDatabaseName,
- secondaryIndexModel.secondaryIndex.indexName,
- SegmentStatus.INSERT_IN_PROGRESS,
- secondaryIndexModel.segmentIdToLoadStartTimeMapping,
- segmentToLoadStartTimeMap,
- indexCarbonTable,
- secondaryIndexModel.sqlContext.sparkSession)
-
// merge index files for success segments in case of only load
CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
successSISegments,
segmentToLoadStartTimeMap,
indexCarbonTable.getTablePath,
indexCarbonTable, mergeIndexProperty = false)
-
val loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(indexCarbonTable.getMetadataPath)
.filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+ for (loadMetadata <- loadMetadataDetails) {
Review comment:
Done
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
return null;
}
+ public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ List<String> mergedIndexFiles = new ArrayList<>();
+ long lastModifiedTime = 0L;
+ long length = 0L;
+ CarbonFile validMergeIndexFile = null;
+ List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+ for (CarbonFile file : indexFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ indexFileStore.readMergeFile(file.getCanonicalPath());
+ Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+ indexFileStore.getCarbonMergeFileToIndexFilesMap();
+ mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+ // In case there are more than 1 mergeindex files present, get the latest one.
+ if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {
Review comment:
done
##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
}
}
+ /**
+ * Clean invalid and expired index files of carbon table.
+ *
+ * @param carbonTable CarbonTable
+ */
+ def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+ val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+ .getValidAndInvalidSegments.getValidSegments.asScala.toList
+ validSegments.foreach( segment => {
+ val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+ segment.getSegmentFileName)
+ var indexFiles = List[CarbonFile]()
+ if (carbonTable.isHivePartitionTable) {
+ val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+ val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+ for (partitionSpec <- partitionSpecs) {
+ var carbonIndexFiles = SegmentIndexFileStore
+ .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+ .toList
+ carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+ .contains(segmentName.substring(
+ segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+ indexFiles = indexFiles ++ carbonIndexFiles
+ cleanUpIndexFilesForSegment(sfs, indexFiles)
+ }
+ } else {
+ val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+ val carbonIndexFiles = SegmentIndexFileStore
+ .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)
Review comment:
done
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -327,6 +315,21 @@ public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment)
return false;
}
+ public static void setIndexFileNamesToFolderDetails(FolderDetails folderDetails,
+ CarbonFile[] indexFiles) throws IOException {
+ List<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(indexFiles);
+ for (CarbonFile file : indexFiles) {
+ // do not include already merged index files details in segment file.
+ if (!mergedAndInvalidIndexFiles.contains(file.getName())) {
Review comment:
I have changed the method from `getInvalidAndMergedIndexFiles` to `getValidCarbonIndexFiles`. In `getValidCarbonIndexFiles`, I have added the empty list check.
##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
@Override
public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
Map<String, String> indexFiles;
- if (segment.getSegmentFileName() == null) {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ if (fileStore.getSegmentFile() == null) {
String path =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
} else {
- SegmentFileStore fileStore =
- new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
indexFiles = fileStore.getIndexOrMergeFiles();
if (fileStore.getSegmentFile() != null) {
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
}
}
+ List<String> index = new ArrayList<>(indexFiles.keySet());
Review comment:
without a list, getting concurrent modification exception. made the list to null at the end.
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1366,18 +1412,10 @@ public static void removeTempFolder(Map<String, FolderDetails> locationMap, Stri
/**
* This method returns the list of index/merge index files for a segment in carbonTable.
*/
- public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath)
- throws IOException {
+ public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath) {
Set<String> indexFiles;
- if (segment.getSegmentFileName() == null) {
- String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
- indexFiles =
- new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
- } else {
- SegmentFileStore segmentFileStore =
Review comment:
reverted and modified to collect invalid index files also.
##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
+ @CarbonProperty
+ public static final String CARBON_INDEXFILES_DELETETIME = "carbon.index.delete.time";
Review comment:
Done. Removed `CARBON_INDEXFILES_DELETETIME` and modified to use `max.query.execution.time`
##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
if (null == index) {
index = new LinkedList<>();
}
+ CarbonFile[] indexFiles = new CarbonFile[index.size()];
+ for (int i = 0; i < index.size(); i++) {
+ indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+ }
+ List<String> mergedIndexFiles = SegmentFileStore.getMergedIndexFiles(indexFiles);
for (String indexPath : index) {
if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
Review comment:
Done.
##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
}
}
+ /**
+ * Clean invalid and expired index files of carbon table.
+ *
+ * @param carbonTable CarbonTable
+ */
+ def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+ val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+ .getValidAndInvalidSegments.getValidSegments.asScala.toList
Review comment:
Done. Similarly made changes to use the same metadataDetails for `cleanUpDeltaFiles`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]