[GitHub] [carbondata] ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

GitBox
ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r278066086
 
 

 ##########
 File path: core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
 ##########
 @@ -196,4 +207,278 @@ private static ICarbonLock getDataMapStatusLock() {
         .getSystemLevelCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
             LockUsage.DATAMAP_STATUS_LOCK);
   }
+
+  /**
+   * Reads and returns dataMapSegmentStatusDetail
+   *
+   * @param dataMapSchema
+   * @throws IOException
+   */
+  public DataMapSegmentStatusDetail getDataMapSegmentStatus(DataMapSchema dataMapSchema)
+      throws IOException {
+    String statusPath = getDatamapSegmentStatusFile(dataMapSchema.getDataMapName());
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    DataMapSegmentStatusDetail dataMapSegmentStatusDetail;
+    try {
+      if (!FileFactory.isFileExist(statusPath)) {
+        return new DataMapSegmentStatusDetail();
+      }
+      dataInputStream =
+          FileFactory.getDataInputStream(statusPath, FileFactory.getFileType(statusPath));
+      inStream = new InputStreamReader(dataInputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      dataMapSegmentStatusDetail =
+          gsonObjectToRead.fromJson(buffReader, DataMapSegmentStatusDetail.class);
+    } catch (IOException e) {
+      LOG.error("Failed to read datamap segment status", e);
+      throw e;
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    if (null == dataMapSegmentStatusDetail) {
+      return new DataMapSegmentStatusDetail();
+    }
+    return dataMapSegmentStatusDetail;
+  }
+
+  /**
+   * After each data load to mv datamap, update the segment status mapping. Get the new load name
+   * from datamap table loadMetaDetails and map newly loaded main table segments against the datamap
+   * table new load entry
+   *
+   * @param dataMapSchema
+   * @throws IOException
+   */
+  public void updateSegmentMapping(DataMapSchema dataMapSchema) throws IOException {
+    DataMapSegmentStatusDetail dataMapSegmentStatus = getDataMapSegmentStatus(dataMapSchema);
+    List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+    CarbonTable dataMapTable = CarbonTable
+        .buildFromTablePath(dataMapSchema.getRelationIdentifier().getTableName(),
+            dataMapSchema.getRelationIdentifier().getDatabaseName(),
+            dataMapSchema.getRelationIdentifier().getTablePath(),
+            dataMapSchema.getRelationIdentifier().getTableId());
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath());
+    if (loadMetadataDetails.length != 0) {
+      String newLoadKey;
+      if (!dataMapSegmentStatus.getSegmentMapping().isEmpty()) {
+        for (LoadMetadataDetails entry : loadMetadataDetails) {
+          if (entry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE
+              || entry.getSegmentStatus() == SegmentStatus.COMPACTED) {
+            //In case of main table or datamap compaction, remove invalid entries
+            dataMapSegmentStatus.getSegmentMapping().remove(entry.getLoadName());
+          }
+        }
+      } else {
+        dataMapSegmentStatus.setDataMapName(dataMapSchema.getDataMapName());
+      }
+      newLoadKey = loadMetadataDetails[loadMetadataDetails.length - 1].getLoadName();
+      Map<String, List<String>> mainTableSegmentMap = new HashMap<>();
+      for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+        List<String> validMainTableSegmentList =
+            DataMapStatusManager.getSegmentList(relationIdentifier);
+        List<String> datamapTableSegmentList =
+            getDataMapSegmentsFromMapping(dataMapSegmentStatus, relationIdentifier);
+        // Compare main table and datamap table valid segment list and collect newly loaded segments
+        // from main table to datamap table
+        validMainTableSegmentList.removeAll(datamapTableSegmentList);
+        mainTableSegmentMap.put(relationIdentifier.getTableName(), validMainTableSegmentList);
+      }
+      dataMapSegmentStatus.getSegmentMapping().put(newLoadKey, mainTableSegmentMap);
+      dataMapSegmentStatus.setSegmentMapping(dataMapSegmentStatus.getSegmentMapping());
+      writeToSegmentStatusFile(dataMapSegmentStatus, dataMapSchema.getDataMapName());
+    }
+  }
+
+  /**
+   * write datamap to mainTbale segment mapping details
+   *
+   * @param dataMapSegmentStatus
+   * @param dataMapName
+   * @throws IOException
+   */
+  private void writeToSegmentStatusFile(DataMapSegmentStatusDetail dataMapSegmentStatus,
+      String dataMapName) throws IOException {
+    ICarbonLock carbonTableStatusLock = getDataMapStatusLock();
+    try {
+      if (carbonTableStatusLock.lockWithRetries()) {
+        writeSegmentDetailsIntoFile(getDatamapSegmentStatusFile(dataMapName), dataMapSegmentStatus);
+      } else {
+        String errorMsg = "Not able to acquire the lock for DataMap Segment status updation";
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    } finally {
+      if (carbonTableStatusLock.unlock()) {
+        CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.DATAMAP_STATUS_LOCK);
+      }
+    }
+  }
+
+  /**
+   * Returns list of segments of mainTable which are already loaded to MV dataMap table
+   *
+   * @param dataMapSegmentStatus
+   * @param relationIdentifier
+   */
+  public List<String> getDataMapSegmentsFromMapping(DataMapSegmentStatusDetail dataMapSegmentStatus,
+      RelationIdentifier relationIdentifier) {
+    List<String> dataMapTableSegmentList = new ArrayList<>();
+    for (Map.Entry<String, Map<String, List<String>>> dataMapSegmentIterator : dataMapSegmentStatus
+        .getSegmentMapping().entrySet()) {
+      for (Map.Entry<String, List<String>> mainTableSegmentIterator : dataMapSegmentIterator
+          .getValue().entrySet()) {
+        String mainTableName = mainTableSegmentIterator.getKey();
+        if (mainTableName.equalsIgnoreCase(relationIdentifier.getTableName())) {
+          dataMapTableSegmentList.addAll(mainTableSegmentIterator.getValue());
+        }
+      }
+    }
+    return dataMapTableSegmentList;
+  }
+
+  /**
+   * Update datamap segment status mapping if datamap table is compacted
+   *
+   * @param dataMapSchema
+   * @param loadMetadataDetails
+   * @throws IOException
+   */
+  public void updateMappingAfterCompaction(DataMapSchema dataMapSchema,
 
 Review comment:
   This class only for writing the data, not for updating the mapping

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services