GitHub user ndwangsen opened a pull request:
https://github.com/apache/carbondata/pull/2864 [CARBONDATA-3041] Optimize load minimum size strategy for data loading this PR modifies the following points: 1. Delete system property carbon.load.min.size.enabledï¼modified this property load_min_size_inmb to table propertyï¼and This property can also be specified in the load option. 2. Support to alter table xxx set TBLPROPERTIES('load_min_size_inmb '='256') 3. If creating a table has this property load_min_size_inmbï¼Display this property via the desc formatted command. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? NA - [ ] Any backward compatibility impacted? NA - [ ] Document update required? YES - [ ] Testing done Test ok in our test env - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/ndwangsen/incubator-carbondata fix_load_min Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2864.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2864 ---- commit bbbe70d04cef85b2c7ab50d3f697e0d1e35efc95 Author: ndwangsen <luffy.wang@...> Date: 2018-10-27T02:38:48Z [CARBONDATA-3041]Optimize load minimum size strategy for data loading ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2864 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1076/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2864 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9341/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2864 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1289/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703110 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -1171,12 +1171,27 @@ object CarbonDataRDDFactory { .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext) val skewedDataOptimization = CarbonProperties.getInstance() .isLoadSkewedDataOptimizationEnabled() - val loadMinSizeOptimization = CarbonProperties.getInstance() - .isLoadMinSizeOptimizationEnabled() // get user ddl input the node loads the smallest amount of data - val expectedMinSizePerNode = carbonLoadModel.getLoadMinSize() + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, "") + var expectedMinSizePerNode = carbonLoadModel.getLoadMinSize() --- End diff -- there is no need to add another variable `expectedMinSizePerNode`. In line 1190, we can just use `loadMinSize` to determine which branch should we go: if it is zero, use 'BLOCK_SIZE_FIRST', otherwise, use 'NODE_MIN_SIZE_FIRST'. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228702949 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala --- @@ -833,4 +833,26 @@ object CommonUtil { }) } } + + /** + * This method will validate single node minimum load data volume of table specified by the user + * + * @param tableProperties table property specified by user + * @param propertyName property name + */ + def validateLoadMinSize(tableProperties: Map[String, String], propertyName: String): Unit = { + var size: Integer = 0 + if (tableProperties.get(propertyName).isDefined) { + val loadSizeStr: String = + parsePropertyValueStringInMB(tableProperties(propertyName)) + try { + size = Integer.parseInt(loadSizeStr) --- End diff -- what about the checking for range bounds, can this be negative or zero? I think in exception scenario, you can set this value to 0, so that later you can use this as a flag (whether the value is zero) to determine whether to enable size-based-block-assignment. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703382 --- Diff: docs/ddl-of-carbondata.md --- @@ -474,7 +475,22 @@ CarbonData DDL statements are documented here,which includes: be later viewed in table description for reference. ``` - TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'') + TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords') + ``` + + - ##### Load minimum data size + This property determines whether to enable node minumun input data size allocation strategy --- End diff -- You can optimize this description like this: ``` This property indicates the minimum input data size per node for data loading. By default it is not enabled. Setting a non-zero integer value will enable this feature. This property is useful if you have a large cluster and only want a small portion of the nodes to process data loading. For example, if you have a cluster with 10 nodes and the input data is about 1GB. Without this property, each node will process about 100MB input data and result in at least 10 data files. With this property configured with 512 will, only 2 nodes will be chosen to process the input data, each with about 512MB input and result in about 2 or 4 files based on the compress ratio. Moreover, this property can also be specified in the load option. Notice that once you enable this feature, for load balance, carbondata will ignore the data locality while assigning input data to nodes, this will cause more network traffic. ``` --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703158 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -748,4 +752,18 @@ object AlterTableUtil { false } } + + private def validateLoadMinSizeProperties(carbonTable: CarbonTable, + propertiesMap: mutable.Map[String, String]): Unit = { + // validate load min size property + if (propertiesMap.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB).isDefined) { + // Cache level is not allowed for child tables and dataMaps --- End diff -- 'Cache level'? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228702976 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala --- @@ -833,4 +833,26 @@ object CommonUtil { }) } } + + /** + * This method will validate single node minimum load data volume of table specified by the user + * + * @param tableProperties table property specified by user + * @param propertyName property name + */ + def validateLoadMinSize(tableProperties: Map[String, String], propertyName: String): Unit = { + var size: Integer = 0 + if (tableProperties.get(propertyName).isDefined) { + val loadSizeStr: String = + parsePropertyValueStringInMB(tableProperties(propertyName)) + try { + size = Integer.parseInt(loadSizeStr) + } catch { + case e: NumberFormatException => --- End diff -- once you update the check, remember to update this error message as well --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703510 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -1171,12 +1171,27 @@ object CarbonDataRDDFactory { .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext) val skewedDataOptimization = CarbonProperties.getInstance() .isLoadSkewedDataOptimizationEnabled() - val loadMinSizeOptimization = CarbonProperties.getInstance() - .isLoadMinSizeOptimizationEnabled() // get user ddl input the node loads the smallest amount of data - val expectedMinSizePerNode = carbonLoadModel.getLoadMinSize() + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala --- End diff -- It seems that you get the load-min-size only from the table property but you claimed that carbon also support specifying it through loadOption. The expected procedure is: 1. get the loadMinSize from LoadOption, if it is zero, goto step2; otherwise goto step4ï¼ 2. get it from TableProperty, if it is zero, go to step 3, otherwise goto step4; 3. use other strategy 4. use NODE_MIN_SIZE_FIRST; Have you handled this? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703135 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala --- @@ -123,6 +123,12 @@ private[sql] case class CarbonDescribeFormattedCommand( tblProps.get(CarbonCommonConstants.LONG_STRING_COLUMNS), "")) } + // load min size info + if (tblProps.containsKey(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)) { + results ++= Seq(("Single node load min data size", --- End diff -- You can optimize this info to 'Minimum input data size per node for data loading' --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703406 --- Diff: docs/ddl-of-carbondata.md --- @@ -474,7 +475,22 @@ CarbonData DDL statements are documented here,which includes: be later viewed in table description for reference. ``` - TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'') + TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords') + ``` + + - ##### Load minimum data size + This property determines whether to enable node minumun input data size allocation strategy + for data loading.It will make sure that the node load the minimum amount of data there by + reducing number of carbondata files. This property is useful if the size of the input data + files are very small, like 1MB to 256MB. And This property can also be specified + in the load option, the property value only int value is supported. + + ``` + TBLPROPERTIES('LOAD_MIN_SIZE_INMB'='256 MB') --- End diff -- I think we can remove this and only support '256' since the property name already contains 'INMB', this will make the code simple. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228703171 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java --- @@ -186,8 +186,7 @@ optionsFinal.put("sort_scope", "local_sort"); optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, "sort_column_bounds", "")); optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, - Maps.getOrDefault(options,CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, - CarbonCommonConstants.CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT)); + Maps.getOrDefault(options,CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, "")); --- End diff -- need a space after "options," --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708250 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala --- @@ -123,6 +123,12 @@ private[sql] case class CarbonDescribeFormattedCommand( tblProps.get(CarbonCommonConstants.LONG_STRING_COLUMNS), "")) } + // load min size info + if (tblProps.containsKey(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)) { + results ++= Seq(("Single node load min data size", --- End diff -- Has been modified based on the review --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708254 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala --- @@ -833,4 +833,26 @@ object CommonUtil { }) } } + + /** + * This method will validate single node minimum load data volume of table specified by the user + * + * @param tableProperties table property specified by user + * @param propertyName property name + */ + def validateLoadMinSize(tableProperties: Map[String, String], propertyName: String): Unit = { + var size: Integer = 0 + if (tableProperties.get(propertyName).isDefined) { + val loadSizeStr: String = + parsePropertyValueStringInMB(tableProperties(propertyName)) + try { + size = Integer.parseInt(loadSizeStr) --- End diff -- Has been modified based on the review --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708257 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala --- @@ -833,4 +833,26 @@ object CommonUtil { }) } } + + /** + * This method will validate single node minimum load data volume of table specified by the user + * + * @param tableProperties table property specified by user + * @param propertyName property name + */ + def validateLoadMinSize(tableProperties: Map[String, String], propertyName: String): Unit = { + var size: Integer = 0 + if (tableProperties.get(propertyName).isDefined) { + val loadSizeStr: String = + parsePropertyValueStringInMB(tableProperties(propertyName)) + try { + size = Integer.parseInt(loadSizeStr) + } catch { + case e: NumberFormatException => --- End diff -- Has been modified based on the review --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708258 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -1171,12 +1171,27 @@ object CarbonDataRDDFactory { .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext) val skewedDataOptimization = CarbonProperties.getInstance() .isLoadSkewedDataOptimizationEnabled() - val loadMinSizeOptimization = CarbonProperties.getInstance() - .isLoadMinSizeOptimizationEnabled() // get user ddl input the node loads the smallest amount of data - val expectedMinSizePerNode = carbonLoadModel.getLoadMinSize() + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, "") + var expectedMinSizePerNode = carbonLoadModel.getLoadMinSize() --- End diff -- Has been modified based on the review --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708260 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -748,4 +752,18 @@ object AlterTableUtil { false } } + + private def validateLoadMinSizeProperties(carbonTable: CarbonTable, + propertiesMap: mutable.Map[String, String]): Unit = { + // validate load min size property + if (propertiesMap.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB).isDefined) { + // Cache level is not allowed for child tables and dataMaps --- End diff -- Has been modified based on the review --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708265 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java --- @@ -186,8 +186,7 @@ optionsFinal.put("sort_scope", "local_sort"); optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, "sort_column_bounds", "")); optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, - Maps.getOrDefault(options,CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, - CarbonCommonConstants.CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT)); + Maps.getOrDefault(options,CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, "")); --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ndwangsen commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2864#discussion_r228708269 --- Diff: docs/ddl-of-carbondata.md --- @@ -474,7 +475,22 @@ CarbonData DDL statements are documented here,which includes: be later viewed in table description for reference. ``` - TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'') + TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords') + ``` + + - ##### Load minimum data size + This property determines whether to enable node minumun input data size allocation strategy --- End diff -- Has been modified based on the review --- |
Free forum by Nabble | Edit this page |