GitHub user mohammadshahidkhan opened a pull request:
https://github.com/apache/carbondata/pull/2466 [WIP][CARBONDATA-2710][Spark Integration] Refactor CarbonSparkSqlParser for better code reuse. ⦠Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mohammadshahidkhan/incubator-carbondata refactor_spark_integration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2466.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2466 ---- commit 47ffeff1293f60c572985500bf875fa88b34931f Author: mohammadshahidkhan <mohdshahidkhan1987@...> Date: 2018-07-09T10:38:47Z [CARBONDATA-2710][Spark Integration] Refactor CarbonSparkSqlParser for better code reuse. ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6951/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5734/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2466 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5721/ --- |
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on the issue:
https://github.com/apache/carbondata/pull/2466 retest SDV please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7007/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2466 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5759/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5785/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2466 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5762/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7054/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5831/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2466 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5798/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5868/ --- |
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on the issue:
https://github.com/apache/carbondata/pull/2466 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5977/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2466#discussion_r202576876 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -169,220 +128,45 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, provider) = createTableTuple val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader) - - // TODO: implement temporary tables - if (temp) { - throw new ParseException( - "CREATE TEMPORARY TABLE is not supported yet. " + - "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader) - } - if (skewSpecContext != null) { - operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext) - } - if (bucketSpecContext != null) { - operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext) - } - - val cols = Option(columns).toSeq.flatMap(visitColTypeList) - val properties = getPropertyKeyValues(tablePropertyList) - - // Ensuring whether no duplicate name is used in table definition - val colNames = cols.map(_.name) - if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of " + - s"$tableIdentifier: ${duplicateColumns.mkString("[", ",", "]")}", columns) - } - - val tablePath = if (locationSpecContext != null) { + val cols: Seq[StructField] = Option(columns).toSeq.flatMap(visitColTypeList) + val colNames: Seq[String] = CarbonSparkSqlParserUtil + .validateCreateTableReqAndGetColumns(tableHeader, + skewSpecContext, + bucketSpecContext, + columns, + cols, + tableIdentifier, + temp) + val tablePath: Option[String] = if (locationSpecContext != null) { Some(visitLocationSpec(locationSpecContext)) } else { None } val tableProperties = mutable.Map[String, String]() + val properties: Map[String, String] = getPropertyKeyValues(tablePropertyList) properties.foreach{property => tableProperties.put(property._1, property._2)} // validate partition clause val (partitionByStructFields, partitionFields) = validatePartitionFields(partitionColumns, colNames, tableProperties) - // validate partition clause - if (partitionFields.nonEmpty) { - if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) { - throw new MalformedCarbonCommandException("Error: Invalid partition definition") - } - // partition columns should not be part of the schema - val badPartCols = partitionFields - .map(_.partitionColumn.toLowerCase) - .toSet - .intersect(colNames.map(_.toLowerCase).toSet) - - if (badPartCols.nonEmpty) { - operationNotAllowed(s"Partition columns should not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), - partitionColumns) - } - } - - val options = new CarbonOption(properties) - // validate streaming property - validateStreamingProperty(options) - var fields = parser.getFields(cols ++ partitionByStructFields) // validate for create table as select val selectQuery = Option(query).map(plan) - selectQuery match { - case Some(q) => - // create table as select does not allow creation of partitioned table - if (partitionFields.nonEmpty) { - val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Carbondata file formats." - operationNotAllowed(errorMessage, partitionColumns) - } - // create table as select does not allow to explicitly specify schema - if (fields.nonEmpty) { - operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", columns) - } - // external table is not allow - if (external) { - operationNotAllowed("Create external table as select", tableHeader) - } - fields = parser - .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore - .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get)) - case _ => - // ignore this case - } - if (partitionFields.nonEmpty && options.isStreaming) { - operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns) - } - // validate tblProperties - val bucketFields = parser.getBucketFields(tableProperties, fields, options) - var isTransactionalTable : Boolean = true - - val tableInfo = if (external) { - // read table info from schema file in the provided table path - // external table also must convert table name to lower case - val identifier = AbsoluteTableIdentifier.from( - tablePath.get, - CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(), - tableIdentifier.table.toLowerCase()) - val table = try { - val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) - if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { - if (provider.equalsIgnoreCase("'carbonfile'")) { - SchemaReader.inferSchema(identifier, true) - } else { - isTransactionalTable = false - SchemaReader.inferSchema(identifier, false) - } - } - else { - SchemaReader.getTableInfo(identifier) - } - } - catch { - case e: Throwable => - operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader) - } - // set "_external" property, so that DROP TABLE will not delete the data - if (provider.equalsIgnoreCase("'carbonfile'")) { - table.getFactTable.getTableProperties.put("_filelevelformat", "true") - table.getFactTable.getTableProperties.put("_external", "false") - } else { - table.getFactTable.getTableProperties.put("_external", "true") - table.getFactTable.getTableProperties.put("_filelevelformat", "false") - } - // setting local dictionary for all string coloumn for external table - var isLocalDic_enabled = table.getFactTable.getTableProperties - .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) - if (null == isLocalDic_enabled) { - table.getFactTable.getTableProperties - .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, - CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) - } - isLocalDic_enabled = table.getFactTable.getTableProperties - .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) - if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) && - isLocalDic_enabled.toBoolean) { - val allcolumns = table.getFactTable.getListOfColumns - for (i <- 0 until allcolumns.size()) { - val cols = allcolumns.get(i) - if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) { - cols.setLocalDictColumn(true) - } - allcolumns.set(i, cols) - } - table.getFactTable.setListOfColumns(allcolumns) - } - - table - } else { - // prepare table model of the collected tokens - val tableModel: TableModel = parser.prepareTableModel( - ifNotExists, - convertDbNameToLowerCase(tableIdentifier.database), - tableIdentifier.table.toLowerCase, - fields, - partitionFields, - tableProperties, - bucketFields, - isAlterFlow = false, - false, - tableComment) - TableNewProcessor(tableModel) - } - tableInfo.setTransactionalTable(isTransactionalTable) - selectQuery match { - case query@Some(q) => - CarbonCreateTableAsSelectCommand( - tableInfo = tableInfo, - query = query.get, - ifNotExistsSet = ifNotExists, - tableLocation = tablePath) - case _ => - CarbonCreateTableCommand( - tableInfo = tableInfo, - ifNotExistsSet = ifNotExists, - tableLocation = tablePath, - external) - } - } - - private def validateStreamingProperty(carbonOption: CarbonOption): Unit = { - try { - carbonOption.isStreaming - } catch { - case _: IllegalArgumentException => - throw new MalformedCarbonCommandException( - "Table property 'streaming' should be either 'true' or 'false'") - } + val extraTableTuple = (cols, external, tableIdentifier, ifNotExists, colNames, tablePath, + tableProperties, properties, partitionByStructFields, partitionFields, + parser, sparkSession, selectQuery) + CarbonSparkSqlParserUtil + .createCarbonTable(createTableTuple, extraTableTuple) } private def validatePartitionFields( --- End diff -- Now this method has only 1 line of code. So, we can move this code line to caller and avoid this method completely. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7203/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5986/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on the issue:
https://github.com/apache/carbondata/pull/2466 LGTM --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2466 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7216/ --- |
Free forum by Nabble | Edit this page |