Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1713 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3361/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/1713 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1713 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4097/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1713 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4617/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1713 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3392/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1713 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4119/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/1713 @jackylk Please review it. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178245698 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 1000 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 10,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + /** + * generate DataFrame with above table schema + * + * @param spark SparkSession + * @return Dataframe of test data + */ + private def generateDataFrame(spark: SparkSession): DataFrame = { --- End diff -- It is better to move this to a separate class in benchmark package, it can be reused in two benchmark class --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178246812 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 1000 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 10,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + /** + * generate DataFrame with above table schema + * + * @param spark SparkSession + * @return Dataframe of test data + */ + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + val df = spark.createDataFrame(rdd, schema) + println(s"Start generate ${df.count} records, schema: ${df.schema}") + df + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + lazy val tmpId = r.nextInt(cardinalityId) % totalNum + lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + // different query SQL + lazy val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ) + , Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select city from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select * from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension, medium result set, fetch all columns" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select id from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + /** + * generate parquet format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating parquet format table + */ + private def generateParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + } + + /** + * generate ORC format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating ORC format table + */ + private def generateOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = + time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + } + + /** + * generate carbon format table + * + * @param spark SparkSession + * @param input DataFrame + * @param tableName table name + * @return the time of generating carbon format table + */ + private def generateCarbonTable(spark: SparkSession, input: DataFrame, tableName: String) + : Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + /** + * load data into parquet, carbonV2, carbonV3 + * + * @param spark SparkSession + * @param table1 table1 name + * @param table2 table2 name + */ + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = if (generateFile) { + generateDataFrame(spark).cache + } else { + null + } + + val table1Time = time { + if (table1.endsWith("parquet")) { + if (generateFile) { + generateParquetTable(spark, df, table1) + } + spark.read.parquet(table1).createOrReplaceTempView(table1) + } else if (table1.endsWith("orc")) { + if (generateFile) { + generateOrcTable(spark, df, table1) + spark.read.orc(table1).createOrReplaceTempView(table1) + } + } else { + sys.error("invalid table: " + table1) + } + } + println(s"$table1 completed, time: $table1Time sec") + + val table2Time: Double = if (generateFile) { + generateCarbonTable(spark, df, table2) + } else { + 0.0 + } + println(s"$table2 completed, time: $table2Time sec") + if (null != df) { + df.unpersist() + } + } + + /** + * Run all queries for the specified table + * + * @param spark SparkSession + * @param tableName table name + */ + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println() + println(s"Start running queries for $tableName...") + println( + "Min: min time" + + "\tMax: max time" + + "\t90%: 90% time" + + "\t99%: 99% time" + + "\tAvg: average time" + + "\tCount: number of result" + + "\tQuery X: running different query sql" + + "\tResult: show it when ResultIsEmpty is false" + + "\tTotal execute time: total runtime") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(threadNum) + val tasks = new java.util.ArrayList[Callable[Results]]() + val tasksStartTime = System.nanoTime() + for (num <- 1 to taskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val tasksEndTime = System.nanoTime() + val sql = s"Query ${index + 1}: $sqlText " + printResults(results, sql, tasksStartTime) + val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 * 1000 * 1000) + println("Total execute time: " + taskTime.formatted("%.3f") + " s") + + val timeString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + writeResults(spark, results, sql, tasksStartTime, + path + s"/${tableName}_query${index + 1}_$timeString") + } + } + + /** + * save the result for subsequent analysis + * + * @param spark SparkSession + * @param results Results + * @param sql query sql + * @param start tasks start time + * @param filePath write file path + */ + def writeResults( + spark: SparkSession, + results: java.util.List[Future[Results]], + sql: String = "", + start: Long, + filePath: String): Unit = { + val timeArray = new Array[(Double, Double, Double)](results.size()) + for (i <- 0 until results.size()) { + timeArray(i) = + ((results.get(i).get().startTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - results.get(i).get().startTime) / (1000.0 * 1000)) + } + val timeArraySorted = timeArray.sortBy(x => x._1) + val timeArrayString = timeArraySorted.map { e => + e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" + e._3.formatted("%.3f") + } + val saveArray = Array(sql, "startTime, endTime, runtime, measure time by the microsecond", + s"${timeArrayString.length}") + .union(timeArrayString) + val rdd = spark.sparkContext.parallelize(saveArray, 1) + rdd.saveAsTextFile(filePath) + } + + /** + * print out results + * + * @param results Results + * @param sql query sql + * @param tasksStartTime tasks start time + */ + def printResults(results: util.List[Future[Results]], sql: String = "", tasksStartTime: Long) { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + // the time of 90 percent sql are finished + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + // the time of 99 percent sql are finished + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 + print( + "Min: " + sortTimeArray.head.formatted("%.3f") + " s," + + "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," + + "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," + + "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," + + "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," + + "\t\tCount: " + results.get(0).get.count + + "\t\t\t\t" + sql + + "\t" + sqlResult.mkString(",") + "\t") + } + + /** + * save result after finishing each task/thread + * + * @param time each task time of executing query sql and with millis time + * @param sqlResult query sql result + * @param count result count + * @param startTime task start time with nano time + * @param endTime task end time with nano time + */ + case class Results( + time: Double, + sqlResult: Array[Row], + count: Int, + startTime: Long, + endTime: Long) + + + class QueryTask(spark: SparkSession, query: String) + extends Callable[Results] with Serializable { + override def call(): Results = { + var result: Array[Row] = null + val startTime = System.nanoTime() + val rt = time { + result = spark.sql(query).collect() --- End diff -- It is better not to collect the result, it will impact gc a lot. I think doing a `count` is better. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178247348 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 1000 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 10,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + /** + * generate DataFrame with above table schema + * + * @param spark SparkSession + * @return Dataframe of test data + */ + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + val df = spark.createDataFrame(rdd, schema) + println(s"Start generate ${df.count} records, schema: ${df.schema}") + df + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + lazy val tmpId = r.nextInt(cardinalityId) % totalNum + lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + // different query SQL + lazy val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ) + , Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select city from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select * from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension, medium result set, fetch all columns" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select id from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + /** + * generate parquet format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating parquet format table + */ + private def generateParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + } + + /** + * generate ORC format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating ORC format table + */ + private def generateOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = + time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + } + + /** + * generate carbon format table + * + * @param spark SparkSession + * @param input DataFrame + * @param tableName table name + * @return the time of generating carbon format table + */ + private def generateCarbonTable(spark: SparkSession, input: DataFrame, tableName: String) + : Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + /** + * load data into parquet, carbonV2, carbonV3 + * + * @param spark SparkSession + * @param table1 table1 name + * @param table2 table2 name + */ + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = if (generateFile) { + generateDataFrame(spark).cache + } else { + null + } + + val table1Time = time { + if (table1.endsWith("parquet")) { + if (generateFile) { + generateParquetTable(spark, df, table1) + } + spark.read.parquet(table1).createOrReplaceTempView(table1) + } else if (table1.endsWith("orc")) { + if (generateFile) { + generateOrcTable(spark, df, table1) + spark.read.orc(table1).createOrReplaceTempView(table1) + } + } else { + sys.error("invalid table: " + table1) + } + } + println(s"$table1 completed, time: $table1Time sec") + + val table2Time: Double = if (generateFile) { + generateCarbonTable(spark, df, table2) + } else { + 0.0 + } + println(s"$table2 completed, time: $table2Time sec") + if (null != df) { + df.unpersist() + } + } + + /** + * Run all queries for the specified table + * + * @param spark SparkSession + * @param tableName table name + */ + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println() + println(s"Start running queries for $tableName...") + println( + "Min: min time" + + "\tMax: max time" + + "\t90%: 90% time" + + "\t99%: 99% time" + + "\tAvg: average time" + + "\tCount: number of result" + + "\tQuery X: running different query sql" + + "\tResult: show it when ResultIsEmpty is false" + + "\tTotal execute time: total runtime") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(threadNum) + val tasks = new java.util.ArrayList[Callable[Results]]() + val tasksStartTime = System.nanoTime() + for (num <- 1 to taskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val tasksEndTime = System.nanoTime() + val sql = s"Query ${index + 1}: $sqlText " + printResults(results, sql, tasksStartTime) + val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 * 1000 * 1000) + println("Total execute time: " + taskTime.formatted("%.3f") + " s") + + val timeString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + writeResults(spark, results, sql, tasksStartTime, + path + s"/${tableName}_query${index + 1}_$timeString") + } + } + + /** + * save the result for subsequent analysis + * + * @param spark SparkSession + * @param results Results + * @param sql query sql + * @param start tasks start time + * @param filePath write file path + */ + def writeResults( + spark: SparkSession, + results: java.util.List[Future[Results]], + sql: String = "", + start: Long, + filePath: String): Unit = { + val timeArray = new Array[(Double, Double, Double)](results.size()) + for (i <- 0 until results.size()) { + timeArray(i) = + ((results.get(i).get().startTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - results.get(i).get().startTime) / (1000.0 * 1000)) + } + val timeArraySorted = timeArray.sortBy(x => x._1) + val timeArrayString = timeArraySorted.map { e => + e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" + e._3.formatted("%.3f") + } + val saveArray = Array(sql, "startTime, endTime, runtime, measure time by the microsecond", + s"${timeArrayString.length}") + .union(timeArrayString) + val rdd = spark.sparkContext.parallelize(saveArray, 1) + rdd.saveAsTextFile(filePath) + } + + /** + * print out results + * + * @param results Results + * @param sql query sql + * @param tasksStartTime tasks start time + */ + def printResults(results: util.List[Future[Results]], sql: String = "", tasksStartTime: Long) { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + // the time of 90 percent sql are finished + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + // the time of 99 percent sql are finished + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 + print( + "Min: " + sortTimeArray.head.formatted("%.3f") + " s," + + "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," + + "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," + + "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," + + "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," + + "\t\tCount: " + results.get(0).get.count + + "\t\t\t\t" + sql + + "\t" + sqlResult.mkString(",") + "\t") + } + + /** + * save result after finishing each task/thread + * + * @param time each task time of executing query sql and with millis time + * @param sqlResult query sql result + * @param count result count + * @param startTime task start time with nano time + * @param endTime task end time with nano time + */ + case class Results( + time: Double, + sqlResult: Array[Row], + count: Int, + startTime: Long, + endTime: Long) + + + class QueryTask(spark: SparkSession, query: String) + extends Callable[Results] with Serializable { + override def call(): Results = { + var result: Array[Row] = null + val startTime = System.nanoTime() + val rt = time { + result = spark.sql(query).collect() + } + val endTime = System.nanoTime() + if (resultIsEmpty) { + Results(rt, Array.empty[Row], count = result.length, startTime, endTime) --- End diff -- it is better not to return the result, it will keep the result in memory thus GC a lot --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178429053 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 1000 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 10,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + /** + * generate DataFrame with above table schema + * + * @param spark SparkSession + * @return Dataframe of test data + */ + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + val df = spark.createDataFrame(rdd, schema) + println(s"Start generate ${df.count} records, schema: ${df.schema}") + df + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + lazy val tmpId = r.nextInt(cardinalityId) % totalNum + lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + // different query SQL + lazy val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ) + , Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select city from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + Query( + "select * from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension, medium result set, fetch all columns" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select id from $table" + s" where city = '$tmpCity' limit 100", + "filter scan", + "filter on low card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + /** + * generate parquet format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating parquet format table + */ + private def generateParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + } + + /** + * generate ORC format table + * + * @param spark SparkSession + * @param input DataFrame + * @param table table name + * @return the time of generating ORC format table + */ + private def generateOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = + time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + } + + /** + * generate carbon format table + * + * @param spark SparkSession + * @param input DataFrame + * @param tableName table name + * @return the time of generating carbon format table + */ + private def generateCarbonTable(spark: SparkSession, input: DataFrame, tableName: String) + : Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + /** + * load data into parquet, carbonV2, carbonV3 + * + * @param spark SparkSession + * @param table1 table1 name + * @param table2 table2 name + */ + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = if (generateFile) { + generateDataFrame(spark).cache + } else { + null + } + + val table1Time = time { + if (table1.endsWith("parquet")) { + if (generateFile) { + generateParquetTable(spark, df, table1) + } + spark.read.parquet(table1).createOrReplaceTempView(table1) + } else if (table1.endsWith("orc")) { + if (generateFile) { + generateOrcTable(spark, df, table1) + spark.read.orc(table1).createOrReplaceTempView(table1) + } + } else { + sys.error("invalid table: " + table1) + } + } + println(s"$table1 completed, time: $table1Time sec") + + val table2Time: Double = if (generateFile) { + generateCarbonTable(spark, df, table2) + } else { + 0.0 + } + println(s"$table2 completed, time: $table2Time sec") + if (null != df) { + df.unpersist() + } + } + + /** + * Run all queries for the specified table + * + * @param spark SparkSession + * @param tableName table name + */ + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println() + println(s"Start running queries for $tableName...") + println( + "Min: min time" + + "\tMax: max time" + + "\t90%: 90% time" + + "\t99%: 99% time" + + "\tAvg: average time" + + "\tCount: number of result" + + "\tQuery X: running different query sql" + + "\tResult: show it when ResultIsEmpty is false" + + "\tTotal execute time: total runtime") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(threadNum) + val tasks = new java.util.ArrayList[Callable[Results]]() + val tasksStartTime = System.nanoTime() + for (num <- 1 to taskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val tasksEndTime = System.nanoTime() + val sql = s"Query ${index + 1}: $sqlText " + printResults(results, sql, tasksStartTime) + val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 * 1000 * 1000) + println("Total execute time: " + taskTime.formatted("%.3f") + " s") + + val timeString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + writeResults(spark, results, sql, tasksStartTime, + path + s"/${tableName}_query${index + 1}_$timeString") + } + } + + /** + * save the result for subsequent analysis + * + * @param spark SparkSession + * @param results Results + * @param sql query sql + * @param start tasks start time + * @param filePath write file path + */ + def writeResults( + spark: SparkSession, + results: java.util.List[Future[Results]], + sql: String = "", + start: Long, + filePath: String): Unit = { + val timeArray = new Array[(Double, Double, Double)](results.size()) + for (i <- 0 until results.size()) { + timeArray(i) = + ((results.get(i).get().startTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - start) / (1000.0 * 1000), + (results.get(i).get().endTime - results.get(i).get().startTime) / (1000.0 * 1000)) + } + val timeArraySorted = timeArray.sortBy(x => x._1) + val timeArrayString = timeArraySorted.map { e => + e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" + e._3.formatted("%.3f") + } + val saveArray = Array(sql, "startTime, endTime, runtime, measure time by the microsecond", + s"${timeArrayString.length}") + .union(timeArrayString) + val rdd = spark.sparkContext.parallelize(saveArray, 1) + rdd.saveAsTextFile(filePath) + } + + /** + * print out results + * + * @param results Results + * @param sql query sql + * @param tasksStartTime tasks start time + */ + def printResults(results: util.List[Future[Results]], sql: String = "", tasksStartTime: Long) { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + // the time of 90 percent sql are finished + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + // the time of 99 percent sql are finished + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 + print( + "Min: " + sortTimeArray.head.formatted("%.3f") + " s," + + "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," + + "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," + + "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," + + "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," + + "\t\tCount: " + results.get(0).get.count + + "\t\t\t\t" + sql + + "\t" + sqlResult.mkString(",") + "\t") + } + + /** + * save result after finishing each task/thread + * + * @param time each task time of executing query sql and with millis time + * @param sqlResult query sql result + * @param count result count + * @param startTime task start time with nano time + * @param endTime task end time with nano time + */ + case class Results( + time: Double, + sqlResult: Array[Row], + count: Int, + startTime: Long, + endTime: Long) + + + class QueryTask(spark: SparkSession, query: String) + extends Callable[Results] with Serializable { + override def call(): Results = { + var result: Array[Row] = null + val startTime = System.nanoTime() + val rt = time { + result = spark.sql(query).collect() + } + val endTime = System.nanoTime() + if (resultIsEmpty) { + Results(rt, Array.empty[Row], count = result.length, startTime, endTime) --- End diff -- resultIsEmpty default is true, so it will not return result defaultly. If user want know the result, such validate the compute result, user can set the resultIsEmpty as false. I have tested it, GC acceptable for small test size. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178429058 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 1000 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 10,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + /** + * generate DataFrame with above table schema + * + * @param spark SparkSession + * @return Dataframe of test data + */ + private def generateDataFrame(spark: SparkSession): DataFrame = { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1713 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4703/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1713 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3476/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1713 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4211/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178438033 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ --- End diff -- add `*`, otherwise stylecheck will fail --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178438035 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 10 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + --- End diff -- remove empty line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178438050 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/GenerateData.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +object GenerateData { --- End diff -- Rename to `DataGenerator` --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178455498 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/GenerateData.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +object GenerateData { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1713#discussion_r178455501 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +// scalastyle:off println +/** + * Test concurrent query performance of CarbonData + * + * This benchmark will print out some information: + * 1.Environment information + * 2.Parameters information + * 3.concurrent query performance result using parquet format + * 4.concurrent query performance result using CarbonData format + * + * This benchmark default run in local model, + * user can change 'runInLocal' to false if want to run in cluster, + * user can change variables like: + * + * spark-submit \ + --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \ + --master yarn \ + --deploy-mode client \ + --driver-memory 16g \ + --executor-cores 4g \ + --executor-memory 24g \ + --num-executors 3 \ + concurrencyTest.jar \ + totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * details in initParameters method of this benchmark + */ +object ConcurrentQueryBenchmark { + + // generate number of data + var totalNum = 1 * 10 * 1000 + // the number of thread pool + var threadNum = 16 + // task number of spark sql query + var taskNum = 100 + // whether is result empty, if true then result is empty + var resultIsEmpty = true + // the store path of task details + var path: String = "/tmp/carbondata" + // whether run in local or cluster + var runInLocal = true + // whether generate new file + var generateFile = true + // whether delete file + var deleteFile = true + + val cardinalityId = 100 * 1000 * 1000 + val cardinalityCity = 6 + + def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet" + + def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc" + + def carbonTableName(version: String): String = + "Num" + totalNum + "_" + s"comparetest_carbonV$version" + + --- End diff -- ok --- |
Free forum by Nabble | Edit this page |