Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1867 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3452/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1867 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3627/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1867 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2387/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1867 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3469/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1867 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1867 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3476/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1867 retest this please --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1867 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1867 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3481/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1867 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3653/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1867 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2414/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1867 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1867 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3490/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1867 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3491/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1867 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1867 anyone can help to review this pr? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1867#discussion_r170854883 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamFactory.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.commons.lang3.StringUtils + +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter + +/** + * Create CarbonStreamSparkStreamingWriter for stream table --- End diff -- change to `Create [[CarbonStreamSparkStreamingWriter]] for stream table` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1867#discussion_r170856116 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala --- @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.Time + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + +class CarbonStreamSparkStreamingWriter { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private var isInitialize: Boolean = false + + private var lock: ICarbonLock = null + private var carbonTable: CarbonTable = null + private var configuration: Configuration = null + private var carbonAppendableStreamSink: Sink = null + private val sparkSession: SparkSession = SparkSession.builder().getOrCreate() + + def this(carbonTable: CarbonTable, configuration: Configuration) { + this() + this.carbonTable = carbonTable + this.configuration = configuration + this.option("dbName", carbonTable.getDatabaseName) + this.option("tableName", carbonTable.getTableName) + } + + /** + * Acquired the lock for stream table + */ + def lockStreamTable(): Unit = { + lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the lock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } else { + LOGGER.error("Not able to acquire the lock for stream table:" + + carbonTable.getDatabaseName + "." + carbonTable.getTableName) + throw new InterruptedException( + "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + /** + * unlock for stream table + */ + def unLockStreamTable(): Unit = { + if (null != lock) { + lock.unlock() + LOGGER.info("unlock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + def initialize(): Unit = { + carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink( + sparkSession, + configuration, + carbonTable, + extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] + + lockStreamTable() + + isInitialize = true + } + + def writeStreamData(dataFrame: DataFrame, time: Time): Unit = { + if (!isInitialize) { + initialize() + } + carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame) + } + + private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private var mode: SaveMode = SaveMode.ErrorIfExists + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Overwrite`: overwrite the existing data. + * - `SaveMode.Append`: append the data. + * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). + * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. + */ + def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = { + if (mode == SaveMode.ErrorIfExists) { + mode = saveMode + } + this + } + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Overwrite`: overwrite the existing data. --- End diff -- saveMode parameter is String, change the description --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1867#discussion_r170856731 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamFactory.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.commons.lang3.StringUtils + +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter + +/** + * Create CarbonStreamSparkStreamingWriter for stream table + * when integrate with Spark Streaming + */ +object CarbonSparkStreamFactory { + + def getStreamSparkStreamWriter( + dbNameStr: String, + tableName: String): CarbonStreamSparkStreamingWriter = + synchronized { + val dbName = if (StringUtils.isEmpty(dbNameStr)) "default" else dbNameStr + val key = dbName + "." + tableName + if (CarbonStreamSparkStreaming.getTableMap.containsKey(key)) { + CarbonStreamSparkStreaming.getTableMap.get(key) + } else { + if (StringUtils.isEmpty(tableName) || tableName.contains(" ")) { + throw new CarbonStreamException("Table creation failed. " + + "Table name must not be blank or " + + "cannot contain blank space") + } + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), + tableName)(SparkSession.builder().getOrCreate()) --- End diff -- build two SparkSession repeatedly in line 47 and line 53, build one instead --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1867#discussion_r170958463 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamFactory.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.commons.lang3.StringUtils + +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter + +/** + * Create CarbonStreamSparkStreamingWriter for stream table --- End diff -- Done --- |
Free forum by Nabble | Edit this page |