GitHub user QiangCai opened a pull request:
https://github.com/apache/carbondata/pull/1485 [CARBONDATA-1572][Streaming] Add test case for streaming ingest Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [X ] Any interfaces changed? No - [ X] Any backward compatibility impacted? No - [ X] Document update required? No - [X ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. Yes You can merge this pull request into a Git repository by running: $ git pull https://github.com/QiangCai/carbondata testcase_for_streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1485 ---- commit e69cd0c3d51f171323ffe13c7a2cc2fc30bed944 Author: QiangCai <[hidden email]> Date: 2017-11-10T11:57:17Z add test case for streaming ingest ---- --- |
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1485 Please list the test suite added in PR description --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150256214 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java --- @@ -51,6 +51,14 @@ private static final String LOAD_Model = "mapreduce.output.carbon.load.model"; + private static final String SEGMENT_ID = "carbon.segment.id"; + + public static final String HANDOFF_SIZE = "carbon.handoff.size"; + + public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64; + + public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024; --- End diff -- please add comment for these constants --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150263213 --- Diff: core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java --- @@ -147,5 +149,39 @@ public static void tearDown() { FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS); assertNotNull(FileFactory.getCarbonFile(filePath, FileFactory.FileType.HDFS)); } + + @Test public void testTruncateFile() { + FileWriter writer = null; + try { + // generate a file + String path = new File("truncatFile").getCanonicalPath(); + writer = new FileWriter(path); + for (int i = 0; i < 4000; i++) { + writer.write("test truncate file method"); + } + writer.close(); + CarbonFile file = FileFactory.getCarbonFile(path); + assertTrue(file.getSize() == 100000L); + + // truncate file to 4000 bytes + FileFactory.truncateFile( + path, + FileFactory.getFileType(path), + 4000); + file = FileFactory.getCarbonFile(path); + assertTrue(file.getSize() == 4000L); --- End diff -- better to use assertEquals so that it will print the size --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150265010 --- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java --- @@ -84,6 +84,9 @@ public void setPartitionCount(String partitionCount) { } public long getLoadEndTime() { + if (timestamp == null) { --- End diff -- Can it be null? why? --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/984/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1600/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150384766 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java --- @@ -51,6 +51,14 @@ private static final String LOAD_Model = "mapreduce.output.carbon.load.model"; + private static final String SEGMENT_ID = "carbon.segment.id"; + + public static final String HANDOFF_SIZE = "carbon.handoff.size"; + + public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64; + + public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024; --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150384776 --- Diff: core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java --- @@ -147,5 +149,39 @@ public static void tearDown() { FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS); assertNotNull(FileFactory.getCarbonFile(filePath, FileFactory.FileType.HDFS)); } + + @Test public void testTruncateFile() { + FileWriter writer = null; + try { + // generate a file + String path = new File("truncatFile").getCanonicalPath(); + writer = new FileWriter(path); + for (int i = 0; i < 4000; i++) { + writer.write("test truncate file method"); + } + writer.close(); + CarbonFile file = FileFactory.getCarbonFile(path); + assertTrue(file.getSize() == 100000L); + + // truncate file to 4000 bytes + FileFactory.truncateFile( + path, + FileFactory.getFileType(path), + 4000); + file = FileFactory.getCarbonFile(path); + assertTrue(file.getSize() == 4000L); --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150384785 --- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java --- @@ -84,6 +84,9 @@ public void setPartitionCount(String partitionCount) { } public long getLoadEndTime() { + if (timestamp == null) { --- End diff -- for stream segment, it is null until the size reach the handoff size --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1009/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1014/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1628/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1632/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411303 --- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java --- @@ -84,6 +84,9 @@ public void setPartitionCount(String partitionCount) { } public long getLoadEndTime() { + if (timestamp == null) { --- End diff -- Can you replace `-1` with a meaningful constant, to improve readability --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411342 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.SparkHadoopWriter; +import org.junit.Assert; +import org.junit.Test; + +public class CarbonStreamInputFormatTest extends TestCase { + + private TaskAttemptID taskAttemptId; + private TaskAttemptContext taskAttemptContext; + private Configuration hadoopConf; + private AbsoluteTableIdentifier identifier; + private String storePath; + + @Override protected void setUp() throws Exception { + storePath = new File("target/stream_input").getCanonicalPath(); + String dbName = "default"; + String tableName = "stream_table_input"; + identifier = new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + + JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + taskAttemptId = new TaskAttemptID(taskId, 0); + + hadoopConf = new Configuration(); + taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId); + } + + private InputSplit buildInputSplit() throws IOException { + CarbonInputSplit carbonInputSplit = new CarbonInputSplit(); + List<CarbonInputSplit> splitList = new ArrayList<>(); + splitList.add(carbonInputSplit); + return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" }, + FileFormat.rowformat); + } + + @Test public void testCreateRecordReader() { + try { + InputSplit inputSplit = buildInputSplit(); + CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat(); + RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext); + Assert.assertNotNull("Failed to create record reader", recordReader); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage(), false); + } + } + --- End diff -- Since it is a input format, can you add a map reduce test case to test it? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411356 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.UUID; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.SparkHadoopWriter; +import org.junit.Assert; +import org.junit.Test; + +public class CarbonStreamOutputFormatTest extends TestCase { + + private Configuration hadoopConf; + private TaskAttemptID taskAttemptId; + private CarbonLoadModel carbonLoadModel; + private String storePath; + + @Override protected void setUp() throws Exception { + super.setUp(); + JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + taskAttemptId = new TaskAttemptID(taskId, 0); + + hadoopConf = new Configuration(); + hadoopConf.set("mapred.job.id", jobId.toString()); + hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString()); + hadoopConf.set("mapred.task.id", taskAttemptId.toString()); + hadoopConf.setBoolean("mapred.task.is.map", true); + hadoopConf.setInt("mapred.task.partition", 0); + + storePath = new File("target/stream_output").getCanonicalPath(); + String dbName = "default"; + String tableName = "stream_table_output"; + AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + + CarbonTable table = StoreCreator.createTable(identifier); + + String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); + carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier); + } + + @Test public void testSetCarbonLoadModel() { + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + } catch (IOException e) { + Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false); + } + } + + @Test public void testGetCarbonLoadModel() { + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); + + Assert.assertNotNull("Failed to get CarbonLoadModel", model); + Assert.assertTrue("CarbonLoadModel should be same with previous", + carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp()); + + } catch (IOException e) { + Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false); + } + } + + @Test public void testGetRecordWriter() { + CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat(); + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + TaskAttemptContext taskAttemptContext = + new TaskAttemptContextImpl(hadoopConf, taskAttemptId); + RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext); + Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage(), false); + } + } + --- End diff -- Since it is output format, can you add a map reduce test case for it? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411369 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala --- @@ -74,8 +74,16 @@ object CarbonStore { Row( load.getLoadName, load.getLoadStatus, - new java.sql.Timestamp(load.getLoadStartTime), - new java.sql.Timestamp(load.getLoadEndTime), + if (load.getLoadStartTime == -1) { --- End diff -- replace `-1` with constant --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411392 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala --- @@ -74,8 +74,16 @@ object CarbonStore { Row( load.getLoadName, load.getLoadStatus, - new java.sql.Timestamp(load.getLoadStartTime), - new java.sql.Timestamp(load.getLoadEndTime), + if (load.getLoadStartTime == -1) { --- End diff -- I think it is better to move `if` block before line 74, like `mergedTo` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411411 --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -68,6 +68,10 @@ class CarbonAppendableStreamSink( } conf } + // segment max size --- End diff -- Size in MB? please add in comment --- |
Free forum by Nabble | Edit this page |