Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149078482 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegmentManager.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming.segment; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.CarbonIndexFileWriter; +import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.format.BlockletIndex; +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat; + +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * streaming segment manager + */ +public class StreamSegmentManager { --- End diff -- Can we unify it with `SegmentStatusManager` --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149283558 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.dictionary.server.DictionaryServer +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.DataLoadingUtil +import org.apache.carbondata.streaming.segment.StreamSegmentManager + +/** + * Stream sink factory + */ +object StreamSinkFactory { + + private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName) + + def createStreamTableSink( + sparkSession: SparkSession, + carbonTable: CarbonTable, + parameters: Map[String, String]): Sink = { + validateParameters(parameters) + + // prepare the stream segment + val segmentId = getStreamSegmentId(carbonTable) + // build load model + val carbonLoadModel = buildCarbonLoadModelForStream( + sparkSession, + carbonTable, + parameters, + segmentId) + // start server if necessary + val server = startDictionaryServer( --- End diff -- this method will add SparkListener to sc, it will trigger shutdown on application end. --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on the issue:
https://github.com/apache/carbondata/pull/1470 LGTM --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1470 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/853/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1470 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1484/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149600151 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.DataLoadProcessBuilder; +import org.apache.carbondata.processing.loading.converter.RowConverter; +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.loading.parser.RowParser; +import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl; +import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl; +import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskID; + +/** + * Stream record writer + */ +public class CarbonStreamRecordWriter extends RecordWriter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName()); + + // basic info + private Configuration hadoopConf; + private CarbonDataLoadConfiguration configuration; + private String segmentId; + private int taskNo; + private CarbonTable carbonTable; + private int maxRowNums; + private int maxCacheSize; + + // parser and converter + private RowParser rowParser; + private RowConverter converter; + private CarbonRow currentRow = new CarbonRow(null); + + // encoder + private DataField[] dataFields; + private BitSet nullBitSet; + private boolean[] isNoDictionaryDimensionColumn; + private int dimensionWithComplexCount; + private int measureCount; + private int[] measureDataTypes; + private StreamBlockletWriter output = null; + + // data write + private String segmentDir; + private String fileName; + private DataOutputStream outputStream; + private boolean isFirstRow = true; + private boolean hasException = false; + + CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException { + initialize(job); + } + + /** + * + */ + private void initialize(TaskAttemptContext job) throws IOException { + // set basic information + hadoopConf = job.getConfiguration(); + CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); + if (carbonLoadModel == null) { + throw new IOException( + "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model"); + } + segmentId = carbonLoadModel.getSegmentId(); + carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); + taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId(); + carbonLoadModel.setTaskNo("" + taskNo); + configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel); + maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS, + CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1; + maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE, + CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT); + // try recover data file from fault for task at first + tryRecoverFromFault(); + } + + /** + * try recover data file from fault for task + */ + private void tryRecoverFromFault() throws IOException { + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + segmentDir = tablePath.getSegmentDir("0", segmentId); + fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); + String indexName = CarbonTablePath.getCarbonStreamIndexFileName(); + CarbonStreamRecordWriter.recoverDataFile(segmentDir, fileName, indexName); + } + + public static void recoverDataFile(String segmentDir, String fileName, String indexName) --- End diff -- Better to keep file recovery and segment recovery handling function in same class --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149607861 --- Diff: pom.xml --- @@ -333,7 +333,7 @@ <version>3.0.4</version> <configuration> <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile> - <failOnError>true</failOnError> + <failOnError>false</failOnError> --- End diff -- why changes this to 'false'? --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149608765 --- Diff: pom.xml --- @@ -478,6 +471,7 @@ <module>integration/spark2</module> <module>integration/hive</module> <module>integration/presto</module> + <module>streaming</module> --- End diff -- please add streaming module to "build-all" profile --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149617386 --- Diff: pom.xml --- @@ -333,7 +333,7 @@ <version>3.0.4</version> <configuration> <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile> - <failOnError>true</failOnError> + <failOnError>false</failOnError> --- End diff -- I will recovery it --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149617389 --- Diff: pom.xml --- @@ -478,6 +471,7 @@ <module>integration/spark2</module> <module>integration/hive</module> <module>integration/presto</module> + <module>streaming</module> --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1470 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/877/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1470 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1490/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1470 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1491/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1470 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/887/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1470 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1470 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/896/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1470 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1502/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on the issue:
https://github.com/apache/carbondata/pull/1470 test this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1470 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/900/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1470 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1511/ --- |
Free forum by Nabble | Edit this page |