Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150431469 --- Diff: integration/spark2/src/test/scala/org/apache/carbondata/streaming/CarbonAppendableStreamTestCase.scala --- @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.io.{File, PrintWriter} +import java.net.ServerSocket +import java.util.concurrent.Executors + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession} +import org.apache.spark.sql.common.util.Spark2QueryTest +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} +import org.apache.spark.sql.types.StructType +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat + +/** + * Test case for org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink + */ +class CarbonAppendableStreamTestCase extends Spark2QueryTest with BeforeAndAfterAll { --- End diff -- please merge to `TestStreamingTableOperation` --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150443123 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.SparkHadoopWriter; +import org.junit.Assert; +import org.junit.Test; + +public class CarbonStreamInputFormatTest extends TestCase { + + private TaskAttemptID taskAttemptId; + private TaskAttemptContext taskAttemptContext; + private Configuration hadoopConf; + private AbsoluteTableIdentifier identifier; + private String storePath; + + @Override protected void setUp() throws Exception { + storePath = new File("target/stream_input").getCanonicalPath(); + String dbName = "default"; + String tableName = "stream_table_input"; + identifier = new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + + JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + taskAttemptId = new TaskAttemptID(taskId, 0); + + hadoopConf = new Configuration(); + taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId); + } + + private InputSplit buildInputSplit() throws IOException { + CarbonInputSplit carbonInputSplit = new CarbonInputSplit(); + List<CarbonInputSplit> splitList = new ArrayList<>(); + splitList.add(carbonInputSplit); + return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" }, + FileFormat.rowformat); + } + + @Test public void testCreateRecordReader() { + try { + InputSplit inputSplit = buildInputSplit(); + CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat(); + RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext); + Assert.assertNotNull("Failed to create record reader", recordReader); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage(), false); + } + } + --- End diff -- I will add the test case in another PR. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150443128 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.UUID; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.SparkHadoopWriter; +import org.junit.Assert; +import org.junit.Test; + +public class CarbonStreamOutputFormatTest extends TestCase { + + private Configuration hadoopConf; + private TaskAttemptID taskAttemptId; + private CarbonLoadModel carbonLoadModel; + private String storePath; + + @Override protected void setUp() throws Exception { + super.setUp(); + JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + taskAttemptId = new TaskAttemptID(taskId, 0); + + hadoopConf = new Configuration(); + hadoopConf.set("mapred.job.id", jobId.toString()); + hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString()); + hadoopConf.set("mapred.task.id", taskAttemptId.toString()); + hadoopConf.setBoolean("mapred.task.is.map", true); + hadoopConf.setInt("mapred.task.partition", 0); + + storePath = new File("target/stream_output").getCanonicalPath(); + String dbName = "default"; + String tableName = "stream_table_output"; + AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath, + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + + CarbonTable table = StoreCreator.createTable(identifier); + + String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); + carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier); + } + + @Test public void testSetCarbonLoadModel() { + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + } catch (IOException e) { + Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false); + } + } + + @Test public void testGetCarbonLoadModel() { + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); + + Assert.assertNotNull("Failed to get CarbonLoadModel", model); + Assert.assertTrue("CarbonLoadModel should be same with previous", + carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp()); + + } catch (IOException e) { + Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false); + } + } + + @Test public void testGetRecordWriter() { + CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat(); + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + TaskAttemptContext taskAttemptContext = + new TaskAttemptContextImpl(hadoopConf, taskAttemptId); + RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext); + Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage(), false); + } + } + --- End diff -- I will add the test case in another PR. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150456900 --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -68,6 +68,10 @@ class CarbonAppendableStreamSink( } conf } + // segment max size --- End diff -- the unit is Byte, I will comment --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150457381 --- Diff: integration/spark2/src/test/scala/org/apache/carbondata/streaming/CarbonAppendableStreamTestCase.scala --- @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.io.{File, PrintWriter} +import java.net.ServerSocket +import java.util.concurrent.Executors + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession} +import org.apache.spark.sql.common.util.Spark2QueryTest +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} +import org.apache.spark.sql.types.StructType +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat + +/** + * Test case for org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink + */ +class CarbonAppendableStreamTestCase extends Spark2QueryTest with BeforeAndAfterAll { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1046/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1664/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1049/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1667/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |