Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157698791 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 10 * 1000 * 1 * 10 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 10,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) + val results = new util.ArrayList[Future[Results]]() + for (num <- (1 to TaskNum).par) { + results.add(executorService.submit(new QueryTask(spark, sqlText))) --- End diff -- Ok, I am testing the invokeALl in local --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2135/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/911/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2424/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/1670 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157947459 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( --- End diff -- I think it is better to give an option to read queries, table name and database name from external file...so that any number of queries can be added without modifying the code.....This will also help to standalone run this class and specifying the query file path as an argument to it. If not now, you can enhance the framework in another PR --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157947962 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) +// val results = new util.ArrayList[Future[Results]]() + val tasks = new util.ArrayList[Callable[Results]]() + + for (num <- 1 to TaskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + val sql = s"query ${index + 1}: $sqlText " + printResult(results, sql) + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val taskTime = (System.currentTimeMillis() - start).toDouble / 1000 + println("task time: " + taskTime + " s") + } + } + + def printResult(results: util.List[Future[Results]], sql: String = "") { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 + print("90%:" + sortTimeArray(time90) + " s," + + "\t99%:" + sortTimeArray(time99) + " s," + + "\tlast:" + sortTimeArray.last + " s," + + "\t" + sql + + "\taverage:" + (timeArray.sum / timeArray.length) + " s," + + "\t" + sqlResult.mkString(",") + "\t") + } + + case class Results(time: Double, sqlResult: Array[Row]) + + + class QueryTask(spark: SparkSession, query: String) + extends Callable[Results] with Serializable { + override def call(): Results = { + var result: Array[Row] = null + val rt = time { + result = spark.sql(query).head(1) + } + if (ResultIsEmpty) { + Results(rt, Array.empty[Row]) + } else { + Results(rt, result) + } + } + } + + // run testcases and print comparison result + def runTest(spark: SparkSession, table1: String, table2: String): Unit = { + // run queries on parquet and carbon + runQueries(spark, table1) + // do GC and sleep for some time before running next table + System.gc() + Thread.sleep(1000) + System.gc() + Thread.sleep(1000) + runQueries(spark, table2) + } + + def time(code: => Unit): Double = { + val start = System.currentTimeMillis() + code + // return time in second + (System.currentTimeMillis() - start).toDouble / 1000 + } + + def doParameter(arr: Array[String]): Unit = { + if (arr.length > 0) { + totalNum = arr(0).toInt + } + if (arr.length > 1) { + ThreadNum = arr(1).toInt + } + if (arr.length > 2) { + TaskNum = arr(2).toInt + } + if (arr.length > 3) { + ResultIsEmpty = if (arr(3).equalsIgnoreCase("true")) { + true + } else if (arr(3).equalsIgnoreCase("false")) { + true + } else { + throw new Exception("error parameter, should be true or false") + } + } + } + + def main(args: Array[String]): Unit = { + CarbonProperties.getInstance() + .addProperty("carbon.enable.vector.reader", "true") + .addProperty("enable.unsafe.sort", "true") + .addProperty("carbon.blockletgroup.size.in.mb", "32") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true") + import org.apache.spark.sql.CarbonSession._ + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + + val spark = SparkSession + .builder() + .master("local[8]") + .enableHiveSupport() + .config("spark.driver.host", "127.0.0.1") + .getOrCreateCarbonSession(storeLocation) + spark.sparkContext.setLogLevel("warn") + + doParameter(args) --- End diff -- Change the method name to initParameters --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157948081 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) +// val results = new util.ArrayList[Future[Results]]() + val tasks = new util.ArrayList[Callable[Results]]() + + for (num <- 1 to TaskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + val sql = s"query ${index + 1}: $sqlText " + printResult(results, sql) + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val taskTime = (System.currentTimeMillis() - start).toDouble / 1000 + println("task time: " + taskTime + " s") + } + } + + def printResult(results: util.List[Future[Results]], sql: String = "") { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 --- End diff -- Please add a detailed comment for your printing logic and explain what is time90 and time99 --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157946201 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) +// val results = new util.ArrayList[Future[Results]]() + val tasks = new util.ArrayList[Callable[Results]]() + + for (num <- 1 to TaskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + val sql = s"query ${index + 1}: $sqlText " + printResult(results, sql) + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) --- End diff -- why printResult is getting called before shutting down executorService? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2445/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/938/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2167/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157972084 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) +// val results = new util.ArrayList[Future[Results]]() + val tasks = new util.ArrayList[Callable[Results]]() + + for (num <- 1 to TaskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + val sql = s"query ${index + 1}: $sqlText " + printResult(results, sql) + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val taskTime = (System.currentTimeMillis() - start).toDouble / 1000 + println("task time: " + taskTime + " s") + } + } + + def printResult(results: util.List[Future[Results]], sql: String = "") { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 + print("90%:" + sortTimeArray(time90) + " s," + + "\t99%:" + sortTimeArray(time99) + " s," + + "\tlast:" + sortTimeArray.last + " s," + + "\t" + sql + + "\taverage:" + (timeArray.sum / timeArray.length) + " s," + + "\t" + sqlResult.mkString(",") + "\t") + } + + case class Results(time: Double, sqlResult: Array[Row]) + + + class QueryTask(spark: SparkSession, query: String) + extends Callable[Results] with Serializable { + override def call(): Results = { + var result: Array[Row] = null + val rt = time { + result = spark.sql(query).head(1) + } + if (ResultIsEmpty) { + Results(rt, Array.empty[Row]) + } else { + Results(rt, result) + } + } + } + + // run testcases and print comparison result + def runTest(spark: SparkSession, table1: String, table2: String): Unit = { + // run queries on parquet and carbon + runQueries(spark, table1) + // do GC and sleep for some time before running next table + System.gc() + Thread.sleep(1000) + System.gc() + Thread.sleep(1000) + runQueries(spark, table2) + } + + def time(code: => Unit): Double = { + val start = System.currentTimeMillis() + code + // return time in second + (System.currentTimeMillis() - start).toDouble / 1000 + } + + def doParameter(arr: Array[String]): Unit = { + if (arr.length > 0) { + totalNum = arr(0).toInt + } + if (arr.length > 1) { + ThreadNum = arr(1).toInt + } + if (arr.length > 2) { + TaskNum = arr(2).toInt + } + if (arr.length > 3) { + ResultIsEmpty = if (arr(3).equalsIgnoreCase("true")) { + true + } else if (arr(3).equalsIgnoreCase("false")) { + true + } else { + throw new Exception("error parameter, should be true or false") + } + } + } + + def main(args: Array[String]): Unit = { + CarbonProperties.getInstance() + .addProperty("carbon.enable.vector.reader", "true") + .addProperty("enable.unsafe.sort", "true") + .addProperty("carbon.blockletgroup.size.in.mb", "32") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true") + import org.apache.spark.sql.CarbonSession._ + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + + val spark = SparkSession + .builder() + .master("local[8]") + .enableHiveSupport() + .config("spark.driver.host", "127.0.0.1") + .getOrCreateCarbonSession(storeLocation) + spark.sparkContext.setLogLevel("warn") + + doParameter(args) --- End diff -- ok, done --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157973096 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) +// val results = new util.ArrayList[Future[Results]]() + val tasks = new util.ArrayList[Callable[Results]]() + + for (num <- 1 to TaskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + val sql = s"query ${index + 1}: $sqlText " + printResult(results, sql) + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) + + val taskTime = (System.currentTimeMillis() - start).toDouble / 1000 + println("task time: " + taskTime + " s") + } + } + + def printResult(results: util.List[Future[Results]], sql: String = "") { + val timeArray = new Array[Double](results.size()) + val sqlResult = results.get(0).get().sqlResult + for (i <- 0 until results.size()) { + results.get(i).get() + } + for (i <- 0 until results.size()) { + timeArray(i) = results.get(i).get().time + } + val sortTimeArray = timeArray.sorted + + val time90 = ((sortTimeArray.length) * 0.9).toInt - 1 + val time99 = ((sortTimeArray.length) * 0.99).toInt - 1 --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157974357 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) +// val results = new util.ArrayList[Future[Results]]() + val tasks = new util.ArrayList[Callable[Results]]() + + for (num <- 1 to TaskNum) { + tasks.add(new QueryTask(spark, sqlText)) + } + val results = executorService.invokeAll(tasks) + + val sql = s"query ${index + 1}: $sqlText " + printResult(results, sql) + executorService.shutdown() + executorService.awaitTermination(600, TimeUnit.SECONDS) --- End diff -- Because printResult invoke get() method(java.util.concurrent.Future#get()), it waits if necessary for the computation to complete, and then retrieves its result. So the tasks have finished when shutting down executorService --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157974643 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 100 * 1000 * 1000 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 100,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( --- End diff -- OK, I will enhance the framework. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/1670 LGTM...once build runs I will merge --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/1670 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/973/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2196/ --- |
Free forum by Nabble | Edit this page |