[ https://issues.apache.org/jira/browse/CARBONDATA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458283#comment-15458283 ] ASF GitHub Bot commented on CARBONDATA-202: ------------------------------------------- Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/122#discussion_r77332668 --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala --- @@ -588,30 +588,59 @@ object GlobalDictionaryUtil extends Logging { allDictionaryPath: String) = { var allDictionaryRdd: RDD[(String, Iterable[String])] = null try { - // read local dictionary file, and spilt (columnIndex, columnValue) - val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath) - .map(x => { + // parse record and validate record + def parseRecord(x: String, accum: Accumulator[Int]) : (String, String) = { val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR) - if (tokens.size != 2) { - logError("Read a bad dictionary record: " + x) - } - var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME + var columnName: String = "" var value: String = "" - try { - columnName = csvFileColumns(tokens(0).toInt) - value = tokens(1) - } catch { - case ex: Exception => - logError("Reset bad dictionary record as default value") + // such as "," , "", throw ex + if (tokens.size == 0) { + logError("Read a bad dictionary record: " + x) + accum += 1 + } else if (tokens.size == 1) { + // such as "1", "jone", throw ex + if (x.contains(",") == false) { + accum += 1 + } else { + try { + columnName = csvFileColumns(tokens(0).toInt) + } catch { + case ex: Exception => + logError("Read a bad dictionary record: " + x) + accum += 1 + } + } + } else { + try { + columnName = csvFileColumns(tokens(0).toInt) + value = tokens(1) + } catch { + case ex: Exception => + logError("Read a bad dictionary record: " + x) + accum += 1 + } } (columnName, value) - }) + } + val accumulator = sqlContext.sparkContext.accumulator(0) + // read local dictionary file, and spilt (columnIndex, columnValue) + val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath) + .map(x => parseRecord(x, accumulator)).persist() + // for accumulator updates performed inside actions only + basicRdd.count() --- End diff -- again we are calling action which is not necessary > Exception thrown in Beeline for data loading when dictionary file content is not in correct format > -------------------------------------------------------------------------------------------------- > > Key: CARBONDATA-202 > URL: https://issues.apache.org/jira/browse/CARBONDATA-202 > Project: CarbonData > Issue Type: Bug > Reporter: Gin-zhj > Assignee: Gin-zhj > Priority: Minor > > Exception thrown in Beeline for data loading when dictionary file content is not in correct format -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |