Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r157426573 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + + var table = CarbonMetadata.getInstance().getCarbonTable( + options.getOrElse("dbName", "default"), options("tableName")) +// table = CarbonTable.buildFromTableInfo(table.getTableInfo, true) --- End diff -- remove it --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r157426782 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + + var table = CarbonMetadata.getInstance().getCarbonTable( --- End diff -- Can you use CarbonEnv.getCarbonTable --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r157427128 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, --- End diff -- move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r157433794 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + + var table = CarbonMetadata.getInstance().getCarbonTable( + options.getOrElse("dbName", "default"), options("tableName")) +// table = CarbonTable.buildFromTableInfo(table.getTableInfo, true) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r157433815 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + + var table = CarbonMetadata.getInstance().getCarbonTable( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r157433873 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1654 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/866/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1654 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2091/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1654 retest this please --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1654 retest this please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1654 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2389/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1654 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/874/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1654 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2099/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1654 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/877/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1654 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2102/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1654#discussion_r170155099 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -339,28 +345,47 @@ private static void setFileHeader(Configuration configuration, CarbonLoadModel m model.setCsvHeaderColumns(columns); } - private static class CarbonRecordWriter extends RecordWriter<NullWritable, StringArrayWritable> { + public static class CarbonRecordWriter extends RecordWriter<NullWritable, StringArrayWritable> { private CarbonOutputIteratorWrapper iteratorWrapper; private DataLoadExecutor dataLoadExecutor; + private CarbonLoadModel loadModel; + + private ExecutorService executorService; + + private Future future; + public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper, - DataLoadExecutor dataLoadExecutor) { + DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, + ExecutorService executorService) { this.iteratorWrapper = iteratorWrapper; this.dataLoadExecutor = dataLoadExecutor; + this.loadModel = loadModel; + this.executorService = executorService; + this.future = future; } - @Override - public void write(NullWritable aVoid, StringArrayWritable strings) + @Override public void write(NullWritable aVoid, StringArrayWritable strings) throws InterruptedException { iteratorWrapper.write(strings.get()); } - @Override - public void close(TaskAttemptContext taskAttemptContext) { + @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { iteratorWrapper.close(); - dataLoadExecutor.close(); + try { + future.get(); + } catch (ExecutionException e) { + throw new InterruptedException(e.getMessage()); --- End diff -- Why did you throw InterruptedException? --- |
Free forum by Nabble | Edit this page |