GitHub user jackylk opened a pull request:
https://github.com/apache/carbondata/pull/3066 [CARBONDATA-3244] Add benchmark for Change Data Capture scenario CDC (change data capture) is a common scenario for analyzing slowly changed table in data warehouse. It is good to add benchmark test comparing two update methods: 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse. 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly. This test simulates updates to history table using CDC table. When running in a 8-cores laptop, the benchmark shows: 1. test one History table 1M records, update 10K records everyday and insert 10K records everyday, simulated 3 days. hive_solution: total process time takes 13,516 ms carbon_solution: total process time takes 7,521 ms 2. test two History table 10M records, update 10K records everyday and insert 10K records everyday, simulated 3 days. hive_solution: total process time takes 104,250 ms carbon_solution: total process time takes 17,384 ms - [X] Any interfaces changed? No - [X] Any backward compatibility impacted? No - [X] Document update required? No - [X] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. Only example is added - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/jackylk/incubator-carbondata cdc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3066.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3066 ---- commit ebb5ef79ac85a6c736496fe19f719bfed74902c1 Author: Jacky Li <jacky.likun@...> Date: 2019-01-10T16:44:58Z add benchmark for Change Data Capture scenario ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3066 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2258/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3066 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2477/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3066 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10516/ --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r246977629 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.sql.Date + +import org.apache.commons.lang3.time.DateUtils +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +/** + * Benchmark for Change Data Capture scenario. + * This test simulates updates to history table using CDC table. + * + * The benchmark shows performance of two update methods: + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse. + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly. + * + * When running in a 8-cores laptop, the benchmark shows: + * + * 1. test one + * History table 1M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 13,516 ms + * carbon_solution: total process time takes 7,521 ms + * + * + * 2. test two + * History table 10M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 104,250 ms + * carbon_solution: total process time takes 17,384 ms + * + */ +object CDCBenchmark { + + // Schema for history table + // Table name: dw_order + // +-------------+-----------+-------------+ + // | Column name | Data type | Cardinality | + // +-------------+-----------+-------------+ + // | order_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | customer_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | start_date | date | NA | + // +-------------+-----------+-------------+ + // | end_date | date | NA | + // +-------------+-----------+-------------+ + // | state | int | 4 | + // +-------------+-----------+-------------+ + case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date, + state: Int) + + // Schema for CDC data which is used for update to history table every day + // Table name: ods_order + // +-------------+-----------+-------------+ + // | Column name | Data type | Cardinality | + // +-------------+-----------+-------------+ + // | order_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | customer_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | update_date | date | NA | + // +-------------+-----------+-------------+ + // | state | int | 4 | + // +-------------+-----------+-------------+ + case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int) + + // number of records for first day + val numOrders = 10000000 + + // number of records to update every day + val numUpdateOrdersDaily = 10000 + + // number of new records to insert every day + val newNewOrdersDaily = 10000 + + // number of days to simulate + val numDays = 3 + + // print eveyday result or not to console + val printDetail = false + + def generateDataForDay0( + sparkSession: SparkSession, + numOrders: Int = 1000000, + startDate: Date = Date.valueOf("2018-05-01")): DataFrame = { + import sparkSession.implicits._ + sparkSession.sparkContext.parallelize(1 to numOrders, 4) + .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1) + }.toDS().toDF() + } + + def generateDailyCDC( + sparkSession: SparkSession, + numUpdatedOrders: Int, + startDate: Date, + updateDate: Date, + newState: Int, + numNewOrders: Int + ): DataFrame = { + import sparkSession.implicits._ + val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4) + .map {x => CDC(s"order$x", s"customer$x", updateDate, newState) + }.toDS().toDF() + val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4) + .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1) + }.toDS().toDF() + ds1.union(ds2) + } + + def main(args: Array[String]): Unit = { + import org.apache.spark.sql.CarbonSession._ + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val master = Option(System.getProperty("spark.master")) + .orElse(sys.env.get("MASTER")) + .orElse(Option("local[8]")) + + val spark = SparkSession + .builder() + .master(master.get) + .enableHiveSupport() + .config("spark.driver.host", "127.0.0.1") + .getOrCreateCarbonSession(storeLocation) + spark.sparkContext.setLogLevel("warn") + + spark.sql("drop table if exists dw_order") + spark.sql("drop table if exists ods_order") + + // prepare base data for first day + val df = generateDataForDay0( + sparkSession = spark, + numOrders = numOrders, + startDate = Date.valueOf("2018-05-01")) + + spark.sql(s"drop table if exists dw_order") + df.write + .format("carbondata") + .option("tableName", "dw_order") + .mode(SaveMode.Overwrite) + .save() + + var startDate = Date.valueOf("2018-05-01") + var state = 2 + var updateTime = 0L + + if (printDetail) { + println("## day0") + spark.sql("select * from dw_order").show(100, false) + } + + for (i <- 1 to numDays) { + // prepare for incremental update data for day-i + val newDate = new Date(DateUtils.addDays(startDate, 1).getTime) + val cdc = generateDailyCDC( + sparkSession = spark, + numUpdatedOrders = numUpdateOrdersDaily, + startDate = startDate, + updateDate = newDate, + newState = state, + numNewOrders = newNewOrdersDaily) + cdc.write + .format("carbondata") + .option("tableName", "ods_order") + .mode(SaveMode.Overwrite) + .save() + + if (printDetail) { + println(s"day$i CDC") + spark.sql("select * from ods_order").show(100, false) + } + + // update dw table using CDC data + val start = System.nanoTime() + hive_solution(spark) + // carbon_solution(spark) --- End diff -- Why is carbon_solution commented, if either hive_solution or carbon_solution is tested once, better to add a flag to control in "args: Array[String]" --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on the issue:
https://github.com/apache/carbondata/pull/3066 i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment) Do we have some method to optimize this? --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r246995496 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.sql.Date + +import org.apache.commons.lang3.time.DateUtils +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +/** + * Benchmark for Change Data Capture scenario. + * This test simulates updates to history table using CDC table. + * + * The benchmark shows performance of two update methods: + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse. + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly. + * + * When running in a 8-cores laptop, the benchmark shows: + * + * 1. test one + * History table 1M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 13,516 ms + * carbon_solution: total process time takes 7,521 ms + * + * + * 2. test two + * History table 10M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 104,250 ms + * carbon_solution: total process time takes 17,384 ms + * + */ +object CDCBenchmark { + + // Schema for history table + // Table name: dw_order + // +-------------+-----------+-------------+ + // | Column name | Data type | Cardinality | + // +-------------+-----------+-------------+ + // | order_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | customer_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | start_date | date | NA | + // +-------------+-----------+-------------+ + // | end_date | date | NA | + // +-------------+-----------+-------------+ + // | state | int | 4 | + // +-------------+-----------+-------------+ + case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date, + state: Int) + + // Schema for CDC data which is used for update to history table every day + // Table name: ods_order + // +-------------+-----------+-------------+ + // | Column name | Data type | Cardinality | + // +-------------+-----------+-------------+ + // | order_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | customer_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | update_date | date | NA | + // +-------------+-----------+-------------+ + // | state | int | 4 | + // +-------------+-----------+-------------+ + case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int) + + // number of records for first day + val numOrders = 10000000 + + // number of records to update every day + val numUpdateOrdersDaily = 10000 + + // number of new records to insert every day + val newNewOrdersDaily = 10000 + + // number of days to simulate + val numDays = 3 + + // print eveyday result or not to console + val printDetail = false + + def generateDataForDay0( + sparkSession: SparkSession, + numOrders: Int = 1000000, + startDate: Date = Date.valueOf("2018-05-01")): DataFrame = { + import sparkSession.implicits._ + sparkSession.sparkContext.parallelize(1 to numOrders, 4) + .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1) + }.toDS().toDF() + } + + def generateDailyCDC( + sparkSession: SparkSession, + numUpdatedOrders: Int, + startDate: Date, + updateDate: Date, + newState: Int, + numNewOrders: Int + ): DataFrame = { + import sparkSession.implicits._ + val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4) + .map {x => CDC(s"order$x", s"customer$x", updateDate, newState) + }.toDS().toDF() + val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4) + .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1) + }.toDS().toDF() + ds1.union(ds2) + } + + def main(args: Array[String]): Unit = { + import org.apache.spark.sql.CarbonSession._ + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val master = Option(System.getProperty("spark.master")) + .orElse(sys.env.get("MASTER")) + .orElse(Option("local[8]")) + + val spark = SparkSession + .builder() + .master(master.get) + .enableHiveSupport() + .config("spark.driver.host", "127.0.0.1") + .getOrCreateCarbonSession(storeLocation) + spark.sparkContext.setLogLevel("warn") + + spark.sql("drop table if exists dw_order") + spark.sql("drop table if exists ods_order") + + // prepare base data for first day + val df = generateDataForDay0( + sparkSession = spark, + numOrders = numOrders, + startDate = Date.valueOf("2018-05-01")) + + spark.sql(s"drop table if exists dw_order") + df.write + .format("carbondata") + .option("tableName", "dw_order") + .mode(SaveMode.Overwrite) + .save() + + var startDate = Date.valueOf("2018-05-01") + var state = 2 + var updateTime = 0L + + if (printDetail) { + println("## day0") + spark.sql("select * from dw_order").show(100, false) + } + + for (i <- 1 to numDays) { + // prepare for incremental update data for day-i + val newDate = new Date(DateUtils.addDays(startDate, 1).getTime) + val cdc = generateDailyCDC( + sparkSession = spark, + numUpdatedOrders = numUpdateOrdersDaily, + startDate = startDate, + updateDate = newDate, + newState = state, + numNewOrders = newNewOrdersDaily) + cdc.write + .format("carbondata") + .option("tableName", "ods_order") + .mode(SaveMode.Overwrite) + .save() + + if (printDetail) { + println(s"day$i CDC") + spark.sql("select * from ods_order").show(100, false) + } + + // update dw table using CDC data + val start = System.nanoTime() + hive_solution(spark) + // carbon_solution(spark) --- End diff -- better to add a variable to run hive_solution or carbon_solution separatelyï¼ using comment is confused to others --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r247119761 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.sql.Date + +import org.apache.commons.lang3.time.DateUtils +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +/** + * Benchmark for Change Data Capture scenario. + * This test simulates updates to history table using CDC table. + * + * The benchmark shows performance of two update methods: + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse. + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly. + * + * When running in a 8-cores laptop, the benchmark shows: + * + * 1. test one + * History table 1M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 13,516 ms + * carbon_solution: total process time takes 7,521 ms + * + * + * 2. test two + * History table 10M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 104,250 ms + * carbon_solution: total process time takes 17,384 ms + * + */ +object CDCBenchmark { + + // Schema for history table + // Table name: dw_order + // +-------------+-----------+-------------+ + // | Column name | Data type | Cardinality | + // +-------------+-----------+-------------+ + // | order_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | customer_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | start_date | date | NA | + // +-------------+-----------+-------------+ + // | end_date | date | NA | + // +-------------+-----------+-------------+ + // | state | int | 4 | + // +-------------+-----------+-------------+ + case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date, + state: Int) + + // Schema for CDC data which is used for update to history table every day + // Table name: ods_order + // +-------------+-----------+-------------+ + // | Column name | Data type | Cardinality | + // +-------------+-----------+-------------+ + // | order_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | customer_id | string | 10,000,000 | + // +-------------+-----------+-------------+ + // | update_date | date | NA | + // +-------------+-----------+-------------+ + // | state | int | 4 | + // +-------------+-----------+-------------+ + case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int) + + // number of records for first day + val numOrders = 10000000 + + // number of records to update every day + val numUpdateOrdersDaily = 10000 + + // number of new records to insert every day + val newNewOrdersDaily = 10000 + + // number of days to simulate + val numDays = 3 + + // print eveyday result or not to console + val printDetail = false + + def generateDataForDay0( + sparkSession: SparkSession, + numOrders: Int = 1000000, + startDate: Date = Date.valueOf("2018-05-01")): DataFrame = { + import sparkSession.implicits._ + sparkSession.sparkContext.parallelize(1 to numOrders, 4) + .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1) + }.toDS().toDF() + } + + def generateDailyCDC( + sparkSession: SparkSession, + numUpdatedOrders: Int, + startDate: Date, + updateDate: Date, + newState: Int, + numNewOrders: Int + ): DataFrame = { + import sparkSession.implicits._ + val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4) + .map {x => CDC(s"order$x", s"customer$x", updateDate, newState) + }.toDS().toDF() + val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4) + .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1) + }.toDS().toDF() + ds1.union(ds2) + } + + def main(args: Array[String]): Unit = { + import org.apache.spark.sql.CarbonSession._ + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val master = Option(System.getProperty("spark.master")) + .orElse(sys.env.get("MASTER")) + .orElse(Option("local[8]")) + + val spark = SparkSession + .builder() + .master(master.get) + .enableHiveSupport() + .config("spark.driver.host", "127.0.0.1") + .getOrCreateCarbonSession(storeLocation) + spark.sparkContext.setLogLevel("warn") + + spark.sql("drop table if exists dw_order") + spark.sql("drop table if exists ods_order") + + // prepare base data for first day + val df = generateDataForDay0( + sparkSession = spark, + numOrders = numOrders, + startDate = Date.valueOf("2018-05-01")) + + spark.sql(s"drop table if exists dw_order") + df.write + .format("carbondata") + .option("tableName", "dw_order") + .mode(SaveMode.Overwrite) + .save() + + var startDate = Date.valueOf("2018-05-01") + var state = 2 + var updateTime = 0L + + if (printDetail) { + println("## day0") + spark.sql("select * from dw_order").show(100, false) + } + + for (i <- 1 to numDays) { + // prepare for incremental update data for day-i + val newDate = new Date(DateUtils.addDays(startDate, 1).getTime) + val cdc = generateDailyCDC( + sparkSession = spark, + numUpdatedOrders = numUpdateOrdersDaily, + startDate = startDate, + updateDate = newDate, + newState = state, + numNewOrders = newNewOrdersDaily) + cdc.write + .format("carbondata") + .option("tableName", "ods_order") + .mode(SaveMode.Overwrite) + .save() + + if (printDetail) { + println(s"day$i CDC") + spark.sql("select * from ods_order").show(100, false) + } + + // update dw table using CDC data + val start = System.nanoTime() + hive_solution(spark) + // carbon_solution(spark) + val end = System.nanoTime() + updateTime += end - start + + if (printDetail) { + println(s"day$i result") + spark.sql("select * from dw_order").show(100, false) + } + + startDate = newDate + state = state + 1 + } + + println(s"simulated $numDays days, total process time takes ${updateTime / 1000 / 1000} ms") + spark.close() + } + + /** + * Typical solution when using hive + * This solution uses INSERT OVERWRITE to rewrite the whole table every day + */ + private def hive_solution(spark: SparkSession) = { + spark.sql( + """ + | insert overwrite table dw_order --- End diff -- Maybe it is better to create a partition table on date column and insert overwrite only on the partition would be better. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/3066 > i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment) > Do we have some method to optimize this? Since we are updating the existing data it creates extra files like delete delta and incremental carbondata files It may degrade query performance a little but when do the compaction it will get improved. --- |
Free forum by Nabble | Edit this page |