Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2628 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6264/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin closed the pull request at:
https://github.com/apache/carbondata/pull/2628 --- |
In reply to this post by qiuchenjian-2
GitHub user xuchuanyin reopened a pull request:
https://github.com/apache/carbondata/pull/2628 [CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store 1. add zstd compressor for compressing column data 2. add zstd support in thrift 3. since zstd does not support zero-copy while compressing, offheap will not take effect for zstd 4. Column compressor is configured through system property and can be changed in each load. During querying, carbondata will get the compressor information from metadata in the file data. 5. This PR also considered and verified on the legacy store and compaction A simple test with 1.2GB raw CSV data shows that the size (in MB) of final store with different compressor: | local dictionary | snappy | zstd | Size Reduced | | --- | --- | --- | -- | | local dict enabled | 335 | 207 | 38.2% | | local dict disabled | 375 | 225 | 40% | Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [x] Any interfaces changed? `Yes, only internal used interfaces are changed` - [x] Any backward compatibility impacted? `Yes, backward compatibility is handled` - [x] Document update required? `Yes` - [x] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? `Added tests` - How it is tested? Please attach test report. `Tested in local machine` - Is it a performance related change? Please attach the performance test report. `The size of final store has been decreased by 40% compared with default snappy` - Any additional information to help reviewers in testing this change. `NA` - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. `NA` You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuchuanyin/carbondata 0810_support_zstd_compressor_final_store Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2628.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2628 ---- commit e840c5bcfe7c27a5d2eb6459d7e391dac0a2091f Author: xuchuanyin <xuchuanyin@...> Date: 2018-08-10T14:02:57Z Support zstd as column compressor in final store 1. add zstd compressor for compressing column data 2. add zstd support in thrift 3. legacy store is not considered in this commit 4. since zstd does not support zero-copy while compressing, offheap will not take effect for zstd 5. support lazy load for compressor commit 926d64a245e76448d71d0557941d7e29559571c7 Author: xuchuanyin <xuchuanyin@...> Date: 2018-08-13T13:45:42Z Support new compressor on legacy store In query procedure, we need to decompress the column page. Previously we get the compressor from system property. Now since we support new compressors, we should read the compressor information from the metadata in datafiles. This PR also solve the compatibility related problems on V1/V2 store where we only support snappy. commit df0ca034b74cde02ec3568b8b5b94930a44c5763 Author: xuchuanyin <xuchuanyin@...> Date: 2018-08-14T08:38:00Z fix comments ---- --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2628 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6265/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7910/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6634/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2628 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7911/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6635/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/2628 @xuchuanyin It is very promissing in term of compression, great! Have you also tested for decompression speed? It will be great if it also improves decompression speed. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r210782737 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompressor.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestLoadDataWithCompressor extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + private def createTable(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField') + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + private def testQuery(): Unit = { + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test current zstd compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() --- End diff -- It is better to add loading option also if you want to specify the compressor for every load --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r211130605 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompressor.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestLoadDataWithCompressor extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + private def createTable(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField') + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + private def testQuery(): Unit = { + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test current zstd compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() --- End diff -- Currently the compressor is only configured by the system property while the load is triggered. Furthermore we can support configuring it in TBLProperty or LoadOption. We will not do it in this PR~ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2628 @jackylk We do not have the environment to test this now. The decompression speed of Zstd will be worse than Snappy based on Zstd's official document. But still we do not know how much does it affect the query. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r211844549 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompressor.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestLoadDataWithCompressor extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + private def createTable(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField') + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + private def testQuery(): Unit = { + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() --- End diff -- please add a testcase for 3 column pages to test multipe page scenario --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r211845798 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompressor.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestLoadDataWithCompressor extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + private def createTable(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField') + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + private def testQuery(): Unit = { + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test current zstd compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current snappy compressor on legacy store with zstd") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test compaction with different compressor for each load") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + + // there are 8 loads + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8))) + assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8) + sql(s"ALTER TABLE $tableName COMPACT 'major'") + sql(s"CLEAN FILES FOR TABLE $tableName") + // after compaction and clean, there should be on segment + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8))) + assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1) + } + + test("test data loading with unsupported compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake") + createTable() + val exception = intercept[Throwable] { --- End diff -- please intercept the exact exception install of Throwable --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on the issue:
https://github.com/apache/carbondata/pull/2628 At the begin of loading, it should specify a compressor name, and use this value during the whole loading step. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r212533227 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompressor.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestLoadDataWithCompressor extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + private def createTable(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField') + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + private def testQuery(): Unit = { + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() --- End diff -- tests added --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r212533250 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompressor.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestLoadDataWithCompressor extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + private def createTable(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField') + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + private def testQuery(): Unit = { + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test data loading with zstd compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + testQuery() + } + + test("test current zstd compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current snappy compressor on legacy store with zstd") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + sql(s"SELECT * FROM $tableName").show(false) + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test compaction with different compressor for each load") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + + // there are 8 loads + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8))) + assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8) + sql(s"ALTER TABLE $tableName COMPACT 'major'") + sql(s"CLEAN FILES FOR TABLE $tableName") + // after compaction and clean, there should be on segment + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8))) + assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1) + } + + test("test data loading with unsupported compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake") + createTable() + val exception = intercept[Throwable] { --- End diff -- optimized --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2628 @QiangCai Fixed. We will get and validate the compressor before loading and will use that compressor during that loading procedure. Tests are added to verify this: during data loading, we will dynamically change the compressor in property and the test is OK. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2628 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6371/ --- |
Free forum by Nabble | Edit this page |