GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/1729 [CARBONDATA-1936] Corrected bad record and avoid double conversion of data in Partitioning table Currently, one time data conversion happens while loading data while creating RDD to make sure the partitions are added with the right format. But this approach creates an issue in case of bad record handling as the writing of bad records not possible from RDD. In this PR we don't convert the data in RDD but convert the data while adding the partition information to hive. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [X] Any interfaces changed? NO - [X] Any backward compatibility impacted?NO - [X] Document update required? NO - [X] Testing done Tests added - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata partition-badrecord Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1729 ---- commit d15b12f78f679a74889ee27b5cdcfb5758b7a902 Author: ravipesala <ravi.pesala@...> Date: 2017-12-27T09:15:36Z Corrected bad record and avoid double conversion of data ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1729 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1151/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1729 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2368/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1729 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2579/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r158954562 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") - val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => - val data = new Array[Any](len) - var i = 0 - while (i < len) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) - i = i + 1 + val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + try { + val query: LogicalPlan = if (dataFrame.isDefined) { + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) } - InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) - // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) - val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") - if (index > 0) { - d.name.substring(0, index).equalsIgnoreCase(attr.name) - } else { - d.name.equalsIgnoreCase(attr.name) - } - }.get + if (updateModel.isDefined) { --- End diff -- It is better to split this loading function into two functions, one for loading another for update. It is hard to read since it mixes two flows --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r158954662 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") - val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => - val data = new Array[Any](len) - var i = 0 - while (i < len) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) - i = i + 1 + val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + try { + val query: LogicalPlan = if (dataFrame.isDefined) { + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) } - InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) - // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) - val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") - if (index > 0) { - d.name.substring(0, index).equalsIgnoreCase(attr.name) - } else { - d.name.equalsIgnoreCase(attr.name) - } - }.get + if (updateModel.isDefined) { + sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + // In case of update, we don't need the segmrntid column in case of partitioning + val dropAttributes = attributes.dropRight(1) + val finalOutput = catalogTable.schema.map { attr => + dropAttributes.find { d => + val index = d.name.lastIndexOf("-updatedColumn") + if (index > 0) { + d.name.substring(0, index).equalsIgnoreCase(attr.name) + } else { + d.name.equalsIgnoreCase(attr.name) + } + }.get + } + Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) + } else { + LogicalRDD(attributes, rdd)(sparkSession) } - Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) - } else { - LogicalRDD(attributes, rdd)(sparkSession) - } - } else { - var timeStampformatString = carbonLoadModel.getTimestampformat - if (timeStampformatString.isEmpty) { - timeStampformatString = carbonLoadModel.getDefaultTimestampFormat - } - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = carbonLoadModel.getDateFormat - if (dateFormatString.isEmpty) { - dateFormatString = carbonLoadModel.getDefaultDateFormat - } - val dateFormat = new SimpleDateFormat(dateFormatString) - // input data from csv files. Convert to logical plan - CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) - hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) - val attributes = - StructType(carbonLoadModel.getCsvHeaderColumns.map( - StructField(_, StringType))).toAttributes - val rowDataTypes = attributes.map { attribute => - catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { - case Some(attr) => attr.dataType - case _ => StringType + } else { + // input data from csv files. Convert to logical plan + CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) + hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val attributes = + StructType(carbonLoadModel.getCsvHeaderColumns.map( + StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map { attribute => + catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } } - } - val len = rowDataTypes.length - // Fail row conversion if fail/ignore badrecord action is enabled - val fail = failAction || ignoreAction - var rdd = - new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf).map{ case (key, value) => + val len = rowDataTypes.length + var rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map { case (key, value) => val data = new Array[Any](len) var i = 0 val input = value.get() val inputLen = Math.min(input.length, len) - try { - while (i < inputLen) { - // TODO find a way to avoid double conversion of date and time. - data(i) = CarbonScalaUtil.convertToUTF8String( - input(i), - rowDataTypes(i), - timeStampFormat, - dateFormat, - serializationNullFormat, - fail) - i = i + 1 - } - InternalRow.fromSeq(data) - } catch { - case e: Exception => - if (failAction) { - // It is badrecord fail case. - throw new BadRecordFoundException( - s"Data load failed due to bad record: " + - s"${input(i)} with datatype ${rowDataTypes(i)}") - } else { - // It is bad record ignore case - InternalRow.empty - } + while (i < inputLen) { + // TODO find a way to avoid double conversion of date and time. + data(i) = UTF8String.fromString(input(i)) + i = i + 1 } + InternalRow.fromSeq(data) + } + // Only select the required columns + val output = if (partition.nonEmpty) { + catalogTable.schema.map { attr => + attributes.find(_.name.equalsIgnoreCase(attr.name)).get + }.filter(attr => partition.get(attr.name).isEmpty) + } else { + catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) } - // In bad record ignore case filter the empty values - if (ignoreAction) { - rdd = rdd.filter(f => f.numFields != 0) + Project(output, LogicalRDD(attributes, rdd)(sparkSession)) } - - // Only select the required columns - val output = if (partition.nonEmpty) { - catalogTable.schema.map{ attr => - attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => partition.get(attr.name).isEmpty) + // TODO need to find a way to avoid double lookup + val sizeInBytes = + CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( + catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes + val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) + val convertRelation = convertToLogicalRelation( + catalogTable, + sizeInBytes, + isOverwriteTable, + carbonLoadModel, + sparkSession) + val convertedPlan = + CarbonReflectionUtils.getInsertIntoCommand( + convertRelation, + partition, + query, + false, --- End diff -- please give variable name --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1729 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2463/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1729 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1239/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1729 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2642/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159204180 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") - val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => - val data = new Array[Any](len) - var i = 0 - while (i < len) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) - i = i + 1 + val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + try { + val query: LogicalPlan = if (dataFrame.isDefined) { + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) } - InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) - // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) - val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") - if (index > 0) { - d.name.substring(0, index).equalsIgnoreCase(attr.name) - } else { - d.name.equalsIgnoreCase(attr.name) - } - }.get + if (updateModel.isDefined) { --- End diff -- It is hard to split as the update scenario needs very small code need to be added in between the method to update the query plan, If we try to implement 2 different methods then it will be lot of duplicate code needs to be added. So I have separated the update code to a new method for better readability. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159204374 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand( // converted to hive standard fomat to let spark understand the data to partition. val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val failAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val ignoreAction = - carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore") - val query: LogicalPlan = if (dataFrame.isDefined) { - var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = - carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val attributes = - StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes - val len = attributes.length - val rdd = dataFrame.get.rdd.map { f => - val data = new Array[Any](len) - var i = 0 - while (i < len) { - data(i) = - UTF8String.fromString( - CarbonScalaUtil.getString(f.get(i), - serializationNullFormat, - delimiterLevel1, - delimiterLevel2, - timeStampFormat, - dateFormat)) - i = i + 1 + val badRecordAction = + carbonLoadModel.getBadRecordsAction.split(",")(1) + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) + CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + timeStampformatString) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT, + serializationNullFormat) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordAction) + CarbonSession.threadSet( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)) + try { + val query: LogicalPlan = if (dataFrame.isDefined) { + val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + val attributes = + StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes + val len = attributes.length + val rdd = dataFrame.get.rdd.map { f => + val data = new Array[Any](len) + var i = 0 + while (i < len) { + data(i) = + UTF8String.fromString( + CarbonScalaUtil.getString(f.get(i), + serializationNullFormat, + delimiterLevel1, + delimiterLevel2, + timeStampFormat, + dateFormat)) + i = i + 1 + } + InternalRow.fromSeq(data) } - InternalRow.fromSeq(data) - } - if (updateModel.isDefined) { - sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) - // In case of update, we don't need the segmrntid column in case of partitioning - val dropAttributes = attributes.dropRight(1) - val finalOutput = catalogTable.schema.map { attr => - dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") - if (index > 0) { - d.name.substring(0, index).equalsIgnoreCase(attr.name) - } else { - d.name.equalsIgnoreCase(attr.name) - } - }.get + if (updateModel.isDefined) { + sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null) + // In case of update, we don't need the segmrntid column in case of partitioning + val dropAttributes = attributes.dropRight(1) + val finalOutput = catalogTable.schema.map { attr => + dropAttributes.find { d => + val index = d.name.lastIndexOf("-updatedColumn") + if (index > 0) { + d.name.substring(0, index).equalsIgnoreCase(attr.name) + } else { + d.name.equalsIgnoreCase(attr.name) + } + }.get + } + Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) + } else { + LogicalRDD(attributes, rdd)(sparkSession) } - Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession)) - } else { - LogicalRDD(attributes, rdd)(sparkSession) - } - } else { - var timeStampformatString = carbonLoadModel.getTimestampformat - if (timeStampformatString.isEmpty) { - timeStampformatString = carbonLoadModel.getDefaultTimestampFormat - } - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = carbonLoadModel.getDateFormat - if (dateFormatString.isEmpty) { - dateFormatString = carbonLoadModel.getDefaultDateFormat - } - val dateFormat = new SimpleDateFormat(dateFormatString) - // input data from csv files. Convert to logical plan - CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) - hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) - val attributes = - StructType(carbonLoadModel.getCsvHeaderColumns.map( - StructField(_, StringType))).toAttributes - val rowDataTypes = attributes.map { attribute => - catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { - case Some(attr) => attr.dataType - case _ => StringType + } else { + // input data from csv files. Convert to logical plan + CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) + hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val attributes = + StructType(carbonLoadModel.getCsvHeaderColumns.map( + StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map { attribute => + catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } } - } - val len = rowDataTypes.length - // Fail row conversion if fail/ignore badrecord action is enabled - val fail = failAction || ignoreAction - var rdd = - new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf).map{ case (key, value) => + val len = rowDataTypes.length + var rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map { case (key, value) => val data = new Array[Any](len) var i = 0 val input = value.get() val inputLen = Math.min(input.length, len) - try { - while (i < inputLen) { - // TODO find a way to avoid double conversion of date and time. - data(i) = CarbonScalaUtil.convertToUTF8String( - input(i), - rowDataTypes(i), - timeStampFormat, - dateFormat, - serializationNullFormat, - fail) - i = i + 1 - } - InternalRow.fromSeq(data) - } catch { - case e: Exception => - if (failAction) { - // It is badrecord fail case. - throw new BadRecordFoundException( - s"Data load failed due to bad record: " + - s"${input(i)} with datatype ${rowDataTypes(i)}") - } else { - // It is bad record ignore case - InternalRow.empty - } + while (i < inputLen) { + // TODO find a way to avoid double conversion of date and time. + data(i) = UTF8String.fromString(input(i)) + i = i + 1 } + InternalRow.fromSeq(data) + } + // Only select the required columns + val output = if (partition.nonEmpty) { + catalogTable.schema.map { attr => + attributes.find(_.name.equalsIgnoreCase(attr.name)).get + }.filter(attr => partition.get(attr.name).isEmpty) + } else { + catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) } - // In bad record ignore case filter the empty values - if (ignoreAction) { - rdd = rdd.filter(f => f.numFields != 0) + Project(output, LogicalRDD(attributes, rdd)(sparkSession)) } - - // Only select the required columns - val output = if (partition.nonEmpty) { - catalogTable.schema.map{ attr => - attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => partition.get(attr.name).isEmpty) + // TODO need to find a way to avoid double lookup + val sizeInBytes = + CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( + catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes + val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) + val convertRelation = convertToLogicalRelation( + catalogTable, + sizeInBytes, + isOverwriteTable, + carbonLoadModel, + sparkSession) + val convertedPlan = + CarbonReflectionUtils.getInsertIntoCommand( + convertRelation, + partition, + query, + false, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1729 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1256/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1729 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2480/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159369206 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java --- @@ -99,6 +99,15 @@ public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS = "carbon.options.global.sort.partitions"; + /** + * specify serialization null format --- End diff -- Please describe what is the purpose of this property --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159369485 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -135,6 +136,25 @@ class CarbonSessionCatalog( sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive } + override def createPartitions(tableName: TableIdentifier, --- End diff -- move tableName to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159369458 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -142,6 +143,25 @@ class CarbonSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } + override def createPartitions(tableName: TableIdentifier, --- End diff -- move tableName to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159369856 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala --- @@ -156,54 +158,156 @@ object CarbonScalaUtil { } /** - * Converts incoming value to UTF8String after converting data as per the data type. + * Converts incoming value to String after converting data as per the data type. * @param value Input value to convert - * @param dataType Datatype to convert and then convert to UTF8String + * @param dataType Datatype to convert and then convert to String * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes - * @param dateFormat DataFormat to convert incase of DateType datatype + * @param dateFormat DataFormat to convert in case of DateType datatype * @param serializationNullFormat if this encounters in input data then data will * be treated as null - * @param fail If it is true then any conversion error will trhow error otherwise it will be - * filled with ull value - * @return converted UTF8String + * @return converted String */ - def convertToUTF8String(value: String, + def convertToString(value: String, --- End diff -- move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159468449 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java --- @@ -99,6 +99,15 @@ public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS = "carbon.options.global.sort.partitions"; + /** + * specify serialization null format --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159468640 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -142,6 +143,25 @@ class CarbonSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } + override def createPartitions(tableName: TableIdentifier, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159468830 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -135,6 +136,25 @@ class CarbonSessionCatalog( sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive } + override def createPartitions(tableName: TableIdentifier, --- End diff -- ok --- |
Free forum by Nabble | Edit this page |