GitHub user ravikiran23 opened a pull request:
https://github.com/apache/incubator-carbondata/pull/492 [CARBONDATA-440] Providing the update and delete support for carbon data. Please refer the Jira issue : CARBONDATA-440. This PR handles the sub tasks from CARBONDATA-499 to CARBONDATA-510 update and delete DDL's have been added. update Country set (population) = (population + 150) delete from countryTable where country IN ('india', 'china') pending : DDL commands document needs to be updated with examples. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sraghunandan/incubator-carbondata IUD_rebased Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-carbondata/pull/492.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #492 ---- commit 53319769aad5f3b811aba3166ed4637010ad1234 Author: vinodkc <[hidden email]> Date: 2016-12-30T08:49:16Z IUD Update and delete DML support commit b1f7ae8d83f44a63c0e332b7d63e9bc9e6d37634 Author: nareshpr <[hidden email]> Date: 2016-12-30T13:14:39Z IUD implicit tupleid commit d6ec4fa164e11545bad3c4b20c10d537dc8d147e Author: ravikiran23 <[hidden email]> Date: 2017-01-02T05:47:24Z IUD update flow support commit 28a9c40914e3b0ca08a585bef6a19d05a197188e Author: ManoharVanam <[hidden email]> Date: 2017-01-02T05:53:34Z IUD delete flow support commit d861fc61ebfabda629c1e37a285eb07711472be3 Author: sounakr <[hidden email]> Date: 2017-01-02T05:58:35Z IUD horizontal compaction of update and delete delta files support commit 192b715325101aa780a08e9fede78845aa192c4a Author: sujith71955 <[hidden email]> Date: 2017-01-02T06:00:59Z IUD query flow support for update and delete delta files commit 4d4d71e4b43417b7af7b05ea92166375fa963cdf Author: Venkata Ramana G <[hidden email]> Date: 2017-01-02T06:11:37Z IUD Integration to query flow commit 374c03e9a26bd04649c055c5ecdaa34ad09327a5 Author: sounakr <[hidden email]> Date: 2017-01-02T07:06:11Z IUD horizontal compaction of update and delete delta files support commit 3084d2e3a1cef94831d3cb1ca90db4742091cbee Author: sraghunandan <[hidden email]> Date: 2017-01-02T07:34:47Z IUD checkstyle and scalasytle fixes commit 9a1bfa7fa4417767b157cf881ecfc12bb242940e Author: sounakr <[hidden email]> Date: 2017-01-02T16:39:30Z corrected the IUD query processing flow commit 5dc46a386725cfb40019e7757887774291bbff99 Author: ravikiran23 <[hidden email]> Date: 2017-01-02T16:41:27Z corrected IUD test cases commit c631e4e88b5dce845b9d50cc60c5db9e1238b127 Author: Manohar Vanam <[hidden email]> Date: 2017-01-02T16:42:50Z corrected IUD test cases commit 9d635b0334b74763565291e20a0e093e4bed32bb Author: ravikiran <[hidden email]> Date: 2017-01-03T16:57:17Z rebased with latest code. commit b2bf2f2ff9e707e92b2dd79924870fb7edcaa30a Author: ravikiran <[hidden email]> Date: 2017-01-03T19:30:01Z correcting rebase error. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 Build Success with Spark 1.5.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/426/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 @ravikiran23 please solve compilation error for spark2.1 integration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 Build Failed with Spark 1.5.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/439/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 Build Success with Spark 1.5.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/441/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 Please handle the below things in general. 1. Add apache license to newly added files. 2. Remove the author name as class level comment. 3. Add a basic comment for newly added class that defines the responsibility of class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94584731 --- Diff: core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/BlockletLevelDeleteDeltaDataCache.java --- @@ -0,0 +1,29 @@ +package org.apache.carbondata.common.iudprocessor.cache; + +import org.roaringbitmap.RoaringBitmap; + +/** + * Created by S71955 on 06-10-2016. + */ +public class BlockletLevelDeleteDeltaDataCache { --- End diff -- As an opinion I think it would be better if we can rename the class to BlockletLevelDeleteDeltaCache --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94584912 --- Diff: core/src/main/java/org/apache/carbondata/common/iudprocessor/cache/DeleteDeltaDataCache.java --- @@ -0,0 +1,29 @@ +package org.apache.carbondata.common.iudprocessor.cache; + +import org.roaringbitmap.RoaringBitmap; + +/** + * Created by S71955 on 06-10-2016. + */ +public class DeleteDeltaDataCache { --- End diff -- Here too can we rename the class to DeleteDeltaCache? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 Build Success with Spark 1.5.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/455/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94771626 --- Diff: core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java --- @@ -114,16 +117,22 @@ private void fillScannedResult(BlocksChunkHolder blocksChunkHolder) throws FilterUnsupportedException { scannedResult.reset(); - QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM); - totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, - totalBlockletStatistic.getCount() + 1); - queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic); + scannedResult.setBlockletId( + blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder + .getDataBlock().nodeNumber()); // apply min max if (isMinMaxEnabled) { - BitSet bitSet = this.filterExecuter - .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(), - blocksChunkHolder.getDataBlock().getColumnsMinValue()); + BitSet bitSet = null; + // check for implicit include filter instance + if (filterExecuter instanceof ImplicitColumnFilterExecutor) { --- End diff -- Can u please explain what is this?? as this requirement is for updating and deletion of data, why filter scanner logic will changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94771952 --- Diff: core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java --- @@ -151,7 +160,13 @@ private void fillScannedResult(BlocksChunkHolder blocksChunkHolder) for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { indexes[index++] = i; } - + // loading delete data cache in blockexecutioninfo instance + DeleteDeltaCacheLoaderIntf deleteCacheLoader = --- End diff -- Scanner responsibility is to load to scan the data why it is loading delete cache ? it should be loaded before scanning like block loading if any delete delta cache exists --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94772201 --- Diff: core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java --- @@ -151,7 +160,13 @@ private void fillScannedResult(BlocksChunkHolder blocksChunkHolder) for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { indexes[index++] = i; } - + // loading delete data cache in blockexecutioninfo instance + DeleteDeltaCacheLoaderIntf deleteCacheLoader = + new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(), blocksChunkHolder + .getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier()); + deleteCacheLoader.loadDeleteDeltaFileDataToCache(); + scannedResult --- End diff -- why we need to set delete delta cache in scan result, i think better to get the delete delta cache bitset and we can do And of filter scanner bit set here and set the resulting bit set to scan result so changes will be less and impact will also reduce --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ManoharVanam commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94906541 --- Diff: core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/DeleteDeltaDataUtil.java --- @@ -0,0 +1,44 @@ +package org.apache.carbondata.common.iudprocessor.iuddata; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.CarbonMetadata; +import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.load.LoadMetadataDetails; +import org.apache.carbondata.core.updatestatus.SegmentStatusManager; + +/** + * Created by S71955 on 06-10-2016. + */ +public class DeleteDeltaDataUtil { + + private DeleteDeltaDataUtil() { --- End diff -- Looks like created singleton class but not using , pls check and remove this class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ManoharVanam commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94907888 --- Diff: core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java --- @@ -316,57 +444,75 @@ public String getFactDir() { /** * gets updated timestamp information from given carbon data file name */ - public static String getUpdateTimeStamp(String carbonDataFileName) { - // Get the file name from path + public static String getTimeStampFromFileName(String carbonDataFileName) { + // Get the timestamp portion of the file. String fileName = getFileName(carbonDataFileName); - // + 1 for size of "-" - int firstDashPos = fileName.indexOf("-"); - int secondDashPos = fileName.indexOf("-", firstDashPos + 1); - int startIndex = fileName.indexOf("-", secondDashPos + 1) + 1; - int endIndex = fileName.indexOf("."); + int startIndex = fileName.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1; + int endIndex = fileName.indexOf(".", startIndex); return fileName.substring(startIndex, endIndex); } + /** - * gets file part number information from given carbon data file name + * This will return the timestamp present in the delete delta file. + * @param fileName + * @return */ - public static String getPartNo(String carbonDataFileName) { + public static String getTimeStampFromDeleteDeltaFile(String fileName) { + return fileName.substring(fileName.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + fileName.lastIndexOf(".")); + } + + /** + * This will return the timestamp present in the delete delta file. + * @param fileName + * @return + */ + public static String getBlockNameFromDeleteDeltaFile(String fileName) { + return fileName.substring(0, + fileName.lastIndexOf(CarbonCommonConstants.HYPHEN)); + } + + /** + * gets updated timestamp information from given carbon data file name + */ + public static String getBucketNo(String carbonFilePath) { // Get the file name from path - String fileName = getFileName(carbonDataFileName); + String fileName = getFileName(carbonFilePath); // + 1 for size of "-" - int startIndex = fileName.indexOf("-") + 1; + int firstDashPos = fileName.indexOf("-"); --- End diff -- use CarbonCommonConstants.HYPHEN --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/492 Build Failed with Spark 1.5.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/464/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94927699 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java --- @@ -407,10 +407,10 @@ public static void executeGraph(LoadModel loadModel, String storeLocation, Strin public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); - loadMetadataDetails.setTimestamp(readCurrentTime()); + // loadMetadataDetails.setTimestamp(readCurrentTime()); --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94931363 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala --- @@ -72,23 +74,71 @@ object CarbonOptimizer { class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) extends Rule[LogicalPlan] with PredicateHelper { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def apply(plan: LogicalPlan): LogicalPlan = { - if (relations.nonEmpty && !isOptimized(plan)) { + def apply(logicalPlan: LogicalPlan): LogicalPlan = { + if (relations.nonEmpty && !isOptimized(logicalPlan)) { + val plan = processPlan(logicalPlan) + val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan) --- End diff -- Why plan change is required? I don't think it is better idea to do plan changes in spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94931905 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java --- @@ -57,58 +57,58 @@ } @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception { - try { - String outPath = "target/output"; - CarbonProjection carbonProjection = new CarbonProjection(); - carbonProjection.addColumn("ID"); - carbonProjection.addColumn("date"); - carbonProjection.addColumn("country"); - carbonProjection.addColumn("name"); - carbonProjection.addColumn("phonetype"); - carbonProjection.addColumn("serialname"); - carbonProjection.addColumn("salary"); - runJob(outPath, carbonProjection, null); - Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); - Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath)); - } catch (Exception e) { - Assert.assertTrue("failed", false); - e.printStackTrace(); - throw e; - } +// try { --- End diff -- Done. Rectified the test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94932078 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java --- @@ -358,66 +372,176 @@ private Expression getFilterPredicates(Configuration configuration) { * @return list of table block * @throws IOException */ - private List<TableBlockInfo> getTableBlockInfo(JobContext job, String segmentId) - throws IOException { + private List<TableBlockInfo> getTableBlockInfo(JobContext job, + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier, + Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, + List<String> updatedTaskList, + UpdateVO updateDetails, + SegmentUpdateStatusManager updateStatusManager, + String segmentId) + throws IOException { List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>(); // get file location of all files of given segment JobContext newJob = new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID()); - newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + ""); + newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, + tableSegmentUniqueIdentifier.getSegmentId() + ""); // identify table blocks for (InputSplit inputSplit : getSplitsInternal(newJob)) { CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit; - BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, - carbonInputSplit.getNumberOfBlocklets()); - tableBlockInfoList.add( - new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), - segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(), - blockletInfos, carbonInputSplit.getVersion())); + // if blockname and update block name is same then cmpare its time stamp with + // tableSegmentUniqueIdentifiertimestamp if time stamp is greater + // then add as TableInfo object. + if (isValidBlockBasedOnUpdateDetails(taskKeys, carbonInputSplit, updateDetails, + updateStatusManager, segmentId)) { + BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, + carbonInputSplit.getNumberOfBlocklets()); + tableBlockInfoList.add( + new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), + tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(), + carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(), + carbonInputSplit.getBlockStorageIdMap())); + } } return tableBlockInfoList; } + private boolean isValidBlockBasedOnUpdateDetails( + Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit, + UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) { + String taskID = null; + if (null != carbonInputSplit) { + if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) { + return false; + } + + if (null == taskKeys) { + return true; + } + + taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName()); + String bucketNo = + CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName()); + + SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder = + new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo); + + String blockTimestamp = carbonInputSplit.getPath().getName() + .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1, + carbonInputSplit.getPath().getName().lastIndexOf('.')); + if (!(updateDetails.getUpdateDeltaStartTimestamp() != null + && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) { + if (!taskKeys.contains(taskBucketHolder)) { + return true; + } + } + } + return false; + } + /** * It returns index for each task file. * @param job * @param absoluteTableIdentifier * @param segmentId * @return * @throws IOException + * @throws IndexBuilderException */ private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs( JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId, - CacheClient cacheClient) throws IOException { + CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException { Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null; + SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; + List<String> updatedTaskList = null; + boolean isSegmentUpdated = false; + Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null; TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); - SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) + SegmentStatusManager statusManager = new SegmentStatusManager(absoluteTableIdentifier); + segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier); + UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); if (null != segmentTaskIndexWrapper) { segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { + taskKeys = segmentIndexMap.keySet(); + isSegmentUpdated = true; + updatedTaskList = + statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager); + } } - // if segment tree is not loaded, load the segment tree - if (segmentIndexMap == null) { - // List<FileStatus> fileStatusList = new LinkedList<FileStatus>(); - List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId); - // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); - - Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); - segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); - - // get Btree blocks for given segment - tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); - segmentTaskIndexWrapper = - cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier); - segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + if (segmentIndexMap == null || isSegmentUpdated) { + // if the segment is updated only the updated blocks TableInfo instance has to be + // retrieved. the same will be filtered based on taskKeys , if the task is same + // for the block then dont add it since already its btree is loaded. + List<TableBlockInfo> tableBlockInfoList = + getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updatedTaskList, + updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager, + segmentId); + if (!tableBlockInfoList.isEmpty()) { + // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); + Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); + segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); + // get Btree blocks for given segment + tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); + tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated); + segmentTaskIndexWrapper = + cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier); + segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } } return segmentIndexMap; } + public BlockMappingVO getBlockRowCount(JobContext job, --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94932320 --- Diff: core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java --- @@ -102,28 +98,29 @@ assertEquals(true, result); } - @Test public void testGetTimeStampWithEmptyTimeStamp() throws Exception { - loadMetadataDetails.setLoadStartTime(""); - Long result = loadMetadataDetails.getLoadStartTimeAsLong(); - assertNull(result); - } - - @Test public void testGetTimeStampWithParserException() throws Exception { - loadMetadataDetails.setLoadStartTime("00.00.00"); - Long result = loadMetadataDetails.getLoadStartTimeAsLong(); - assertNull(result); - } +// @Test public void testGetTimeStampWithEmptyTimeStamp() throws Exception { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |