ajantha-bhat commented on pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#issuecomment-651585538 @VenuReddy2103 : I have finished review. please check the comments. Also add a test case for a) multiple task writing data and index to partition with mergeindex = false. b) partition + update + merge index = false ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r447453448 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ########## @@ -640,6 +653,10 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + override def afterEach(): Unit = { + CarbonProperties.getInstance() Review comment: Actually it is a good practice to do cleanup/reset in afterEach so that even if testcase fails abnormally with some exceptions, it gets cleaned/reset in afterEach. And any testcases((if exists) below it will not be affected with previous testcase failures. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r447455176 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ########## @@ -640,6 +653,10 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + override def afterEach(): Unit = { + CarbonProperties.getInstance() Review comment: If test case fails, we need to fix the test case. So, just because test case can fail. we should not add before each as it is redundant for other test case and it will increase CI time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r447455176 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ########## @@ -640,6 +653,10 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + override def afterEach(): Unit = { + CarbonProperties.getInstance() Review comment: If test case fails, we need to fix the test case or code. So, just because test case can fail. we should not add before each as it is redundant for other test case and it will increase CI time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kumarvishal09 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r449384678 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ########## @@ -302,6 +318,61 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet, commitJobFinal(context, loadModel, operationContext, carbonTable, uniqueId); } + /** + * Method to create and write the segment file, removes the temporary directories from all the + * respective partition directories. This method is invoked only when {@link + * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled. + * @param context Job context + * @param loadModel Load model + * @param segmentFileName Segment file name to write + * @param partitionPath Serialized list of partition location + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void writeSegmentWithoutMergeIndex(JobContext context, CarbonLoadModel loadModel, + String segmentFileName, String partitionPath) throws IOException { + Map<String, String> indexFileNameMap = (Map<String, String>) ObjectSerializationUtil + .convertStringToObject(context.getConfiguration().get("carbon.index.files.name")); + List<String> partitionList = + (List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath); + SegmentFileStore.SegmentFile finalSegmentFile = null; + boolean isRelativePath; + String partitionLoc; + for (String partition : partitionList) { + isRelativePath = false; + partitionLoc = partition; + if (partitionLoc.startsWith(loadModel.getTablePath())) { + partitionLoc = partitionLoc.substring(loadModel.getTablePath().length()); + isRelativePath = true; + } + SegmentFileStore.SegmentFile segmentFile = new SegmentFileStore.SegmentFile(); + SegmentFileStore.FolderDetails folderDetails = new SegmentFileStore.FolderDetails(); + folderDetails.setFiles(Collections.singleton(indexFileNameMap.get(partition))); + folderDetails.setPartitions( + Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 1))); + folderDetails.setRelative(isRelativePath); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + segmentFile.getLocationMap().put(partitionLoc, folderDetails); + if (finalSegmentFile != null) { + finalSegmentFile = finalSegmentFile.merge(segmentFile); + } else { + finalSegmentFile = segmentFile; + } + } + Objects.requireNonNull(finalSegmentFile); + String segmentFilesLocation = Review comment: its better to move this code inside SegmentFileStore itself, pass the table path and segment file name and internally it will handle folder creation. Pls check may be its already present String segmentFilesLocation = CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath()); CarbonFile locationFile = FileFactory.getCarbonFile(segmentFilesLocation); if (!locationFile.exists()) { locationFile.mkdirs(); } SegmentFileStore.writeSegmentFile(finalSegmentFile, segmentFilesLocation + "/" + segmentFileName + CarbonTablePath.SEGMENT_EXT); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kumarvishal09 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r449386951 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ########## @@ -302,6 +318,61 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet, commitJobFinal(context, loadModel, operationContext, carbonTable, uniqueId); } + /** + * Method to create and write the segment file, removes the temporary directories from all the + * respective partition directories. This method is invoked only when {@link + * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled. + * @param context Job context + * @param loadModel Load model + * @param segmentFileName Segment file name to write + * @param partitionPath Serialized list of partition location + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void writeSegmentWithoutMergeIndex(JobContext context, CarbonLoadModel loadModel, + String segmentFileName, String partitionPath) throws IOException { + Map<String, String> indexFileNameMap = (Map<String, String>) ObjectSerializationUtil + .convertStringToObject(context.getConfiguration().get("carbon.index.files.name")); + List<String> partitionList = + (List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath); + SegmentFileStore.SegmentFile finalSegmentFile = null; + boolean isRelativePath; + String partitionLoc; + for (String partition : partitionList) { + isRelativePath = false; + partitionLoc = partition; + if (partitionLoc.startsWith(loadModel.getTablePath())) { + partitionLoc = partitionLoc.substring(loadModel.getTablePath().length()); + isRelativePath = true; + } + SegmentFileStore.SegmentFile segmentFile = new SegmentFileStore.SegmentFile(); + SegmentFileStore.FolderDetails folderDetails = new SegmentFileStore.FolderDetails(); + folderDetails.setFiles(Collections.singleton(indexFileNameMap.get(partition))); + folderDetails.setPartitions( + Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 1))); + folderDetails.setRelative(isRelativePath); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + segmentFile.getLocationMap().put(partitionLoc, folderDetails); + if (finalSegmentFile != null) { Review comment: @ajantha-bhat code looks fine, it's in a loop ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r449387619 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ########## @@ -302,6 +318,61 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet, commitJobFinal(context, loadModel, operationContext, carbonTable, uniqueId); } + /** + * Method to create and write the segment file, removes the temporary directories from all the + * respective partition directories. This method is invoked only when {@link + * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled. + * @param context Job context + * @param loadModel Load model + * @param segmentFileName Segment file name to write + * @param partitionPath Serialized list of partition location + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void writeSegmentWithoutMergeIndex(JobContext context, CarbonLoadModel loadModel, + String segmentFileName, String partitionPath) throws IOException { + Map<String, String> indexFileNameMap = (Map<String, String>) ObjectSerializationUtil + .convertStringToObject(context.getConfiguration().get("carbon.index.files.name")); + List<String> partitionList = + (List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath); + SegmentFileStore.SegmentFile finalSegmentFile = null; + boolean isRelativePath; + String partitionLoc; + for (String partition : partitionList) { + isRelativePath = false; + partitionLoc = partition; + if (partitionLoc.startsWith(loadModel.getTablePath())) { + partitionLoc = partitionLoc.substring(loadModel.getTablePath().length()); + isRelativePath = true; + } + SegmentFileStore.SegmentFile segmentFile = new SegmentFileStore.SegmentFile(); + SegmentFileStore.FolderDetails folderDetails = new SegmentFileStore.FolderDetails(); + folderDetails.setFiles(Collections.singleton(indexFileNameMap.get(partition))); + folderDetails.setPartitions( + Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 1))); + folderDetails.setRelative(isRelativePath); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + segmentFile.getLocationMap().put(partitionLoc, folderDetails); + if (finalSegmentFile != null) { Review comment: yesh, it is in loop. ignore this comment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r450746967 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ########## @@ -282,10 +296,12 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet, throw new IOException(e); } } - String segmentFileName = SegmentFileStore.genSegmentFileName( - loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp())); newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT); - newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize()); + if (isMergeIndex) { Review comment: loadModel.getMetrics().getMergeIndexSize() filled in MergeIndexEventListene.onEvent() when mergeindex is created. So, can't make it else case to line 280. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r450746967 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ########## @@ -282,10 +296,12 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet, throw new IOException(e); } } - String segmentFileName = SegmentFileStore.genSegmentFileName( - loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp())); newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT); - newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize()); + if (isMergeIndex) { Review comment: loadModel.getMetrics().getMergeIndexSize() is filled in MergeIndexEventListene.onEvent() when mergeindex is created. So, can't make it else case to line 280. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r450765502 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -253,6 +255,14 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i if (size.isDefined) { dataSize = dataSize + java.lang.Long.parseLong(size.get) } + val indexSize = map.get("carbon.indexsize") + if (indexSize.isDefined) { + indexLen = indexLen + java.lang.Long.parseLong(indexSize.get) + } + val indexFiles = map.get("carbon.index.files.name") + if (indexFiles.isDefined) { + indexFilesName = indexFiles.get Review comment: It is a serialied. "carbon.output.partitions.name" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r450765502 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -253,6 +255,14 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i if (size.isDefined) { dataSize = dataSize + java.lang.Long.parseLong(size.get) } + val indexSize = map.get("carbon.indexsize") + if (indexSize.isDefined) { + indexLen = indexLen + java.lang.Long.parseLong(indexSize.get) + } + val indexFiles = map.get("carbon.index.files.name") + if (indexFiles.isDefined) { + indexFilesName = indexFiles.get Review comment: It is a serialied map. Like "carbon.output.partitions.name" and "carbon.output.files.name", "carbon.index.files.name" is also serialized. We deserialize and use in writeSegmentWithoutMergeIndex(). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r450765502 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -253,6 +255,14 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i if (size.isDefined) { dataSize = dataSize + java.lang.Long.parseLong(size.get) } + val indexSize = map.get("carbon.indexsize") + if (indexSize.isDefined) { + indexLen = indexLen + java.lang.Long.parseLong(indexSize.get) + } + val indexFiles = map.get("carbon.index.files.name") + if (indexFiles.isDefined) { + indexFilesName = indexFiles.get Review comment: It is a serialied map. Like "carbon.output.partitions.name" and "carbon.output.files.name", "carbon.index.files.name" is also serialized. We deserialize and use it before calling SegmentFileStore.writeSegmentFile ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r450765502 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -253,6 +255,14 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i if (size.isDefined) { dataSize = dataSize + java.lang.Long.parseLong(size.get) } + val indexSize = map.get("carbon.indexsize") + if (indexSize.isDefined) { + indexLen = indexLen + java.lang.Long.parseLong(indexSize.get) + } + val indexFiles = map.get("carbon.index.files.name") + if (indexFiles.isDefined) { + indexFilesName = indexFiles.get Review comment: It is a serialied map. Like "carbon.output.partitions.name" and "carbon.output.files.name", "carbon.index.files.name" is also serialized. We deserialize and use ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#issuecomment-655060959 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1585/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#issuecomment-655062212 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3325/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r451275288 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ########## @@ -640,6 +653,10 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + override def afterEach(): Unit = { + CarbonProperties.getInstance() Review comment: ok. Removed this afterEach. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#discussion_r451275614 ########## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ########## @@ -302,6 +318,61 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet, commitJobFinal(context, loadModel, operationContext, carbonTable, uniqueId); } + /** + * Method to create and write the segment file, removes the temporary directories from all the + * respective partition directories. This method is invoked only when {@link + * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled. + * @param context Job context + * @param loadModel Load model + * @param segmentFileName Segment file name to write + * @param partitionPath Serialized list of partition location + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void writeSegmentWithoutMergeIndex(JobContext context, CarbonLoadModel loadModel, + String segmentFileName, String partitionPath) throws IOException { + Map<String, String> indexFileNameMap = (Map<String, String>) ObjectSerializationUtil + .convertStringToObject(context.getConfiguration().get("carbon.index.files.name")); + List<String> partitionList = + (List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath); + SegmentFileStore.SegmentFile finalSegmentFile = null; + boolean isRelativePath; + String partitionLoc; + for (String partition : partitionList) { + isRelativePath = false; + partitionLoc = partition; + if (partitionLoc.startsWith(loadModel.getTablePath())) { + partitionLoc = partitionLoc.substring(loadModel.getTablePath().length()); + isRelativePath = true; + } + SegmentFileStore.SegmentFile segmentFile = new SegmentFileStore.SegmentFile(); + SegmentFileStore.FolderDetails folderDetails = new SegmentFileStore.FolderDetails(); + folderDetails.setFiles(Collections.singleton(indexFileNameMap.get(partition))); + folderDetails.setPartitions( + Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 1))); + folderDetails.setRelative(isRelativePath); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + segmentFile.getLocationMap().put(partitionLoc, folderDetails); + if (finalSegmentFile != null) { + finalSegmentFile = finalSegmentFile.merge(segmentFile); + } else { + finalSegmentFile = segmentFile; + } + } + Objects.requireNonNull(finalSegmentFile); + String segmentFilesLocation = Review comment: Agreed. Moved this code to SegmentFileStore. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#issuecomment-655588445 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1589/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#issuecomment-655589396 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3329/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3776: URL: https://github.com/apache/carbondata/pull/3776#issuecomment-655669978 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3330/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |