Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157346945 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -316,16 +332,35 @@ case class CarbonLoadDataCommand( } else { dataFrame } - CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, - carbonLoadModel, - columnar, - partitionStatus, - server, - isOverwriteTable, - hadoopConf, - loadDataFrame, - updateModel, - operationContext) + + if (carbonTable.isStandardPartitionTable) { + try { + loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame) + } finally { + server match { + case Some(dictServer) => + try { + dictServer.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getCarbonTableIdentifier.getTableId) + } catch { + case _: Exception => + throw new Exception("Dataload failed due to error while writing dictionary file!") + } + case _ => + } + } + } else { + CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157346996 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand( } else { (dataFrame, dataFrame) } - if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + if (!table.isChildDataMap) { GlobalDictionaryUtil.generateGlobalDictionary( sparkSession.sqlContext, carbonLoadModel, hadoopConf, dictionaryDataFrame) } - CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, - carbonLoadModel, - columnar, - partitionStatus, - None, - isOverwriteTable, - hadoopConf, - loadDataFrame, - updateModel, - operationContext) + if (table.isStandardPartitionTable) { + loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame) + } else { + CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, + carbonLoadModel, + columnar, + partitionStatus, + None, + isOverwriteTable, + hadoopConf, + loadDataFrame, + updateModel, + operationContext) + } + } + + private def loadStandardPartition(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + dataFrame: Option[DataFrame]) = { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val logicalPlan = + sparkSession.sessionState.catalog.lookupRelation( + TableIdentifier(table.getTableName, Some(table.getDatabaseName))) + val relation = logicalPlan.collect { + case l: LogicalRelation => l + }.head + + + val query: LogicalPlan = if (dataFrame.isDefined) { + var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT + val dateFormat = new SimpleDateFormat(dateFormatString) + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) + } + LogicalRDD(attributes, rdd)(sparkSession) + + } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + // input data from csv files. Convert to logical plan + CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) + hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val attributes = + StructType(carbonLoadModel.getCsvHeaderColumns.map( + StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => + relation.output.find(_.name.equalsIgnoreCase(f.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } + } + val len = rowDataTypes.length + val rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map{f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + // TODO find a way to avoid double conversion of date and time. + data(i) = CarbonScalaUtil.getString( + f._2.get()(i), + rowDataTypes(i), + timeStampFormat, + dateFormat) + i = i + 1 + } + InternalRow.fromSeq(data) + } + + // Only select the required columns + Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get), + LogicalRDD(attributes, rdd)(sparkSession)) + } + Dataset.ofRows(sparkSession, InsertIntoTable( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157346999 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand( } else { (dataFrame, dataFrame) } - if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + if (!table.isChildDataMap) { GlobalDictionaryUtil.generateGlobalDictionary( sparkSession.sqlContext, carbonLoadModel, hadoopConf, dictionaryDataFrame) } - CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, - carbonLoadModel, - columnar, - partitionStatus, - None, - isOverwriteTable, - hadoopConf, - loadDataFrame, - updateModel, - operationContext) + if (table.isStandardPartitionTable) { + loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame) + } else { + CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, + carbonLoadModel, + columnar, + partitionStatus, + None, + isOverwriteTable, + hadoopConf, + loadDataFrame, + updateModel, + operationContext) + } + } + + private def loadStandardPartition(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + dataFrame: Option[DataFrame]) = { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val logicalPlan = + sparkSession.sessionState.catalog.lookupRelation( + TableIdentifier(table.getTableName, Some(table.getDatabaseName))) + val relation = logicalPlan.collect { + case l: LogicalRelation => l + }.head + + + val query: LogicalPlan = if (dataFrame.isDefined) { + var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT + val dateFormat = new SimpleDateFormat(dateFormatString) + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) + } + LogicalRDD(attributes, rdd)(sparkSession) + + } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + // input data from csv files. Convert to logical plan + CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) + hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val attributes = + StructType(carbonLoadModel.getCsvHeaderColumns.map( + StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => + relation.output.find(_.name.equalsIgnoreCase(f.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } + } + val len = rowDataTypes.length + val rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map{f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + // TODO find a way to avoid double conversion of date and time. + data(i) = CarbonScalaUtil.getString( + f._2.get()(i), + rowDataTypes(i), + timeStampFormat, + dateFormat) + i = i + 1 + } + InternalRow.fromSeq(data) + } + + // Only select the required columns + Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get), + LogicalRDD(attributes, rdd)(sparkSession)) + } + Dataset.ofRows(sparkSession, InsertIntoTable( + convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession), + partition, + query, + OverwriteOptions(isOverwriteTable), false)) + } + + private def convertToLogicalRelation( + relation: LogicalRelation, + overWrite: Boolean, + loadModel: CarbonLoadModel, + sparkSession: SparkSession): LogicalRelation = { + val catalogTable = relation.catalogTable.get + val table = loadModel.getCarbonDataLoadSchema.getCarbonTable + val metastoreSchema = StructType(StructType.fromAttributes( + relation.output).fields.map(_.copy(dataType = StringType))) + val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions + val catalog = new CatalogFileIndex( + sparkSession, catalogTable, relation.relation.sizeInBytes) + if (lazyPruningEnabled) { + catalog + } else { + catalog.filterPartitions(Nil) // materialize all the partitions in memory + } + val partitionSchema = + StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f => + metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get)) + --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157347018 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand( } else { (dataFrame, dataFrame) } - if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + if (!table.isChildDataMap) { GlobalDictionaryUtil.generateGlobalDictionary( sparkSession.sqlContext, carbonLoadModel, hadoopConf, dictionaryDataFrame) } - CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, - carbonLoadModel, - columnar, - partitionStatus, - None, - isOverwriteTable, - hadoopConf, - loadDataFrame, - updateModel, - operationContext) + if (table.isStandardPartitionTable) { + loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame) + } else { + CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, + carbonLoadModel, + columnar, + partitionStatus, + None, + isOverwriteTable, + hadoopConf, + loadDataFrame, + updateModel, + operationContext) + } + } + + private def loadStandardPartition(sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + hadoopConf: Configuration, + dataFrame: Option[DataFrame]) = { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val logicalPlan = + sparkSession.sessionState.catalog.lookupRelation( + TableIdentifier(table.getTableName, Some(table.getDatabaseName))) + val relation = logicalPlan.collect { + case l: LogicalRelation => l + }.head + + + val query: LogicalPlan = if (dataFrame.isDefined) { + var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT + val dateFormat = new SimpleDateFormat(dateFormatString) + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) + } + LogicalRDD(attributes, rdd)(sparkSession) + + } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + // input data from csv files. Convert to logical plan + CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) + hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val attributes = + StructType(carbonLoadModel.getCsvHeaderColumns.map( + StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => + relation.output.find(_.name.equalsIgnoreCase(f.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } + } + val len = rowDataTypes.length + val rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map{f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + // TODO find a way to avoid double conversion of date and time. + data(i) = CarbonScalaUtil.getString( + f._2.get()(i), + rowDataTypes(i), + timeStampFormat, + dateFormat) + i = i + 1 + } + InternalRow.fromSeq(data) + } + + // Only select the required columns + Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get), + LogicalRDD(attributes, rdd)(sparkSession)) + } + Dataset.ofRows(sparkSession, InsertIntoTable( + convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession), + partition, + query, + OverwriteOptions(isOverwriteTable), false)) + } + + private def convertToLogicalRelation( + relation: LogicalRelation, + overWrite: Boolean, + loadModel: CarbonLoadModel, + sparkSession: SparkSession): LogicalRelation = { + val catalogTable = relation.catalogTable.get + val table = loadModel.getCarbonDataLoadSchema.getCarbonTable + val metastoreSchema = StructType(StructType.fromAttributes( + relation.output).fields.map(_.copy(dataType = StringType))) + val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions + val catalog = new CatalogFileIndex( + sparkSession, catalogTable, relation.relation.sizeInBytes) + if (lazyPruningEnabled) { + catalog + } else { + catalog.filterPartitions(Nil) // materialize all the partitions in memory + } + val partitionSchema = + StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f => + metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get)) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/830/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157347031 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + + var table = CarbonMetadata.getInstance().getCarbonTable( + options.getOrElse("dbName", "default"), options("tableName")) +// table = CarbonTable.buildFromTableInfo(table.getTableInfo, true) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157347049 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + + var table = CarbonMetadata.getInstance().getCarbonTable( + options.getOrElse("dbName", "default"), options("tableName")) +// table = CarbonTable.buildFromTableInfo(table.getTableInfo, true) + val model = new CarbonLoadModel + val carbonProperty = CarbonProperties.getInstance() + val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) + val tableProperties = table.getTableInfo.getFactTable.getTableProperties + optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + val partitionStr = + table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map( + _.getColumnName.toLowerCase).mkString(",") + optionsFinal.put( + "fileheader", + dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr) + DataLoadingUtil.buildCarbonLoadModel( + table, + carbonProperty, + options, + optionsFinal, + model, + conf + ) + model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + model.setPartitionId("0") + model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) + model.setDictionaryServerHost(options.getOrElse("dicthost", null)) + model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) + CarbonTableOutputFormat.setLoadModel(conf, model) + CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) + + new OutputWriterFactory { + + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir + var storeLocation: Array[String] = Array[String]() + val isCarbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") + val tmpLocationSuffix = File.separator + System.nanoTime() + if (isCarbonUseLocalDir) { + val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) { + // use single dir + storeLocation = storeLocation :+ + (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix) + if (storeLocation == null || storeLocation.isEmpty) { + storeLocation = storeLocation :+ + (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + } else { + // use all the yarn dirs + storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix) + } + } else { + storeLocation = + storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation) + new CarbonOutputWriter(path, context, dataSchema.map(_.dataType)) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".carbondata" --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1672 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2366/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/834/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/835/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2061/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1672 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2396/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/884/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2110/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1672 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2399/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/887/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2112/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157646660 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala --- @@ -152,6 +154,22 @@ object CarbonScalaUtil { } } + def getString(value: String, --- End diff -- move parameter to next line, and add comment for this function. Maybe rename to `convertToUTF8String` is better --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157646748 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand( LogicalRDD(attributes, rdd)(sparkSession) } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) // input data from csv files. Convert to logical plan CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) val jobConf = new JobConf(hadoopConf) SparkHadoopUtil.get.addCredentials(jobConf) - val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf - ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString))) - val attributes = StructType(carbonLoadModel.getCsvHeaderColumns.map( StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => --- End diff -- change `f` to `attribute`, add space after `map` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157646893 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand( LogicalRDD(attributes, rdd)(sparkSession) } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) // input data from csv files. Convert to logical plan CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) val jobConf = new JobConf(hadoopConf) SparkHadoopUtil.get.addCredentials(jobConf) - val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf - ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString))) - val attributes = StructType(carbonLoadModel.getCsvHeaderColumns.map( StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => + relation.output.find(_.name.equalsIgnoreCase(f.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } + } + val len = rowDataTypes.length + val rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map{f => --- End diff -- better to use `case (key, value)` instead of `f` --- |
Free forum by Nabble | Edit this page |