GitHub user xubo245 opened a pull request:
https://github.com/apache/carbondata/pull/1670 [CARBONDATA-1899] Add CarbonData concurrency test case Add CarbonData concurrency test case Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? No - [ ] Any backward compatibility impacted? No - [ ] Document update required? No - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. No You can merge this pull request into a Git repository by running: $ git pull https://github.com/xubo245/carbondata concurrentTest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1670.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1670 ---- commit 82fe8efca1397c440ae8524e589863e4a81de7ef Author: xubo245 <[hidden email]> Date: 2017-12-15T14:10:00Z [CARBONDATA-1899] Add CarbonData concurrency test case Add CarbonData concurrency test case ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2014/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/794/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2337/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157356774 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/ConcurrencyTest.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataretention --- End diff -- You do not need to add in testcase, I think you can add it in a separated benchmark module --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157363538 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/ConcurrencyTest.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataretention --- End diff -- Do you mean add it into example module? --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2074/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2375/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/849/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/851/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2076/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2377/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2378/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/862/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2087/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1670 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2385/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/1670 retest this please --- |
In reply to this post by qiuchenjian-2
Github user ManoharVanam commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1670#discussion_r157472681 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala --- @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util +import java.util.concurrent.{Callable, Executors, Future, TimeUnit} + +import scala.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println +object ConcurrencyTest { + + var totalNum = 10 * 1000 * 1 * 10 + var ThreadNum = 16 + var TaskNum = 100 + var ResultIsEmpty = true + val cardinalityId = 10000 * 10000 + val cardinalityCity = 6 + + def parquetTableName: String = "comparetest_parquet" + + def orcTableName: String = "comparetest_orc" + + def carbonTableName(version: String): String = s"comparetest_carbonV$version" + + // Table schema: + // +-------------+-----------+-------------+-------------+------------+ + // | id | string | 10,000,000 | dimension | no | + // +-------------+-----------+-------------+-------------+------------+ + // | Column name | Data type | Cardinality | Column type | Dictionary | + // +-------------+-----------+-------------+-------------+------------+ + // | city | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | country | string | 6 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | planet | string | 100,007 | dimension | yes | + // +-------------+-----------+-------------+-------------+------------+ + // | m1 | short | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m2 | int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m3 | big int | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m4 | double | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + // | m5 | decimal | NA | measure | no | + // +-------------+-----------+-------------+-------------+------------+ + + private def generateDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext + .parallelize(1 to totalNum, 4) + .map { x => + ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007, + (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, + BigDecimal.valueOf(x.toDouble / 11)) + }.map { x => + Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9) + } + + val schema = StructType( + Seq( + StructField("id", StringType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("planet", StringType, nullable = false), + StructField("m1", ShortType, nullable = false), + StructField("m2", IntegerType, nullable = false), + StructField("m3", LongType, nullable = false), + StructField("m4", DoubleType, nullable = false), + StructField("m5", DecimalType(30, 10), nullable = false) + ) + ) + + spark.createDataFrame(rdd, schema) + } + + // performance test queries, they are designed to test various data access type + val r = new Random() + val tmpId = r.nextInt(cardinalityId) % totalNum + val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum) + val queries: Array[Query] = Array( + Query( + "select * from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select id from $table" + s" where id = '$tmpId' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select * from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select city from $table" + s" where city = '$tmpCity' ", + "filter scan", + "filter on high card dimension" + ), + + Query( + "select country, sum(m1) from $table group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select country, sum(m1) from $table" + + s" where id = '$tmpId' group by country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ), + + Query( + "select t1.country, sum(t1.m1) from $table t1 join $table t2" + + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + , + Query( + "select t2.country, sum(t2.m1) " + + "from $table t1 join $table t2 join $table t3 " + + "join $table t4 join $table t5 join $table t6 join $table t7 " + + s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " + + s"and t1.id=t5.id and t1.id=t6.id and " + + s"t1.id=t7.id " + + s" where t2.id = '$tmpId' " + + s" group by t2.country", + "aggregate", + "group by on big data, on medium card column, medium result set," + ) + ) + + private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String) + : Double = time { + // partitioned by last 1 digit of id column + val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10)) + dfWithPartition.write + .partitionBy("partitionCol") + .mode(SaveMode.Overwrite) + .parquet(table) + spark.read.parquet(table).createOrReplaceTempView(table) + } + + private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time { + // partitioned by last 1 digit of id column + input.write + .mode(SaveMode.Overwrite) + .orc(table) + spark.read.orc(table).createOrReplaceTempView(table) + } + + private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + "3" + ) + spark.sql(s"drop table if exists $tableName") + time { + input.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + } + } + + // load data into parquet, carbonV2, carbonV3 + def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = { + val df = generateDataFrame(spark).cache + println(s"generating ${df.count} records, schema: ${df.schema}") + val table1Time = if (table1.endsWith("parquet")) { + loadParquetTable(spark, df, table1) + } else if (table1.endsWith("orc")) { + loadOrcTable(spark, df, table1) + } else { + sys.error("invalid table: " + table1) + } + val table2Time = loadCarbonTable(spark, df, table2) + println(s"load completed, time: $table1Time, $table2Time") + df.unpersist() + } + + // Run all queries for the specified table + private def runQueries(spark: SparkSession, tableName: String): Unit = { + println(s"start running queries for $tableName...") + val start = System.currentTimeMillis() + println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " + + "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false") + queries.zipWithIndex.map { case (query, index) => + val sqlText = query.sqlText.replace("$table", tableName) + + val executorService = Executors.newFixedThreadPool(ThreadNum) + val results = new util.ArrayList[Future[Results]]() + for (num <- (1 to TaskNum).par) { + results.add(executorService.submit(new QueryTask(spark, sqlText))) --- End diff -- For better concurrency, I think invokeAll is better option instead of submitting task every time --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/872/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1670 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2097/ --- |
Free forum by Nabble | Edit this page |