Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/94/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2628 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6476/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8165/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2628 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/100/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2628 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8171/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213923265 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java --- @@ -17,42 +17,99 @@ package org.apache.carbondata.core.datastore.compression; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.format.CompressionCodec; public class CompressorFactory { - private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory(); - private final Compressor snappyCompressor; + private final Map<String, SupportedCompressor> compressors = new HashMap<>(); + + public enum SupportedCompressor { + SNAPPY(CompressionCodec.SNAPPY, "snappy", SnappyCompressor.class), + ZSTD(CompressionCodec.ZSTD, "zstd", ZstdCompressor.class); + + private CompressionCodec codec; + private String name; + private Class<Compressor> compressorClass; + private transient Compressor compressor; + + SupportedCompressor(CompressionCodec codec, String name, Class compressorCls) { + this.codec = codec; + this.name = name; + this.compressorClass = compressorCls; + } + + public CompressionCodec getCodec() { + return codec; + } + + public String getName() { + return name; + } + + /** + * we will load the compressor only if it is needed + */ + public Compressor getCompressor() { + if (this.compressor == null) { + try { + this.compressor = compressorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Exception occurs while getting compressor for " + name); + } + } + return this.compressor; + } + } private CompressorFactory() { - String compressorType = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); - switch (compressorType) { - case "snappy": - snappyCompressor = new SnappyCompressor(); - break; - default: - throw new RuntimeException( - "Invalid compressor type provided! Please provide valid compressor type"); + for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) { + compressors.put(supportedCompressor.getName(), supportedCompressor); } } public static CompressorFactory getInstance() { return COMPRESSOR_FACTORY; } + /** + * get the default compressor. + * This method can only be called in data load procedure to compress column page. + * In query procedure, we should read the compressor information from the metadata + * in datafiles when we want to decompress the content. + */ public Compressor getCompressor() { - return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + String compressorType = CarbonProperties.getInstance() --- End diff -- Is compressorType in case sensitive? --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213929653 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java --- @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.ShortBuffer; + +import org.apache.carbondata.core.util.ByteUtil; + +import com.github.luben.zstd.Zstd; + +public class ZstdCompressor implements Compressor { + private static final int COMPRESS_LEVEL = 3; --- End diff -- What is COMPRESS_LEVEL? --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213940095 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -66,15 +66,19 @@ protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)); + // compressor to be used for this column + protected String columnCompressorName; --- End diff -- ColumnPageEncoderMeta also has a compressor name variable. Is it necessary to use re-factory it? --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213934733 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -94,222 +98,240 @@ public void setStatsCollector(ColumnPageStatsCollector statsCollector) { } private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new SafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec, - DataType dataType, int pageSize, int eachValueSize) { + DataType dataType, int pageSize, int eachValueSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize, + compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (DataTypes.isDecimal(dataType)) { - return createDecimalPage(columnSpec, dataType, pageSize); + return createDecimalPage(columnSpec, dataType, pageSize, compressorName); } else if (dataType.equals(BYTE_ARRAY)) { - return createVarLengthPage(columnSpec, dataType, pageSize); + return createVarLengthPage(columnSpec, dataType, pageSize, compressorName); } else { - return createFixLengthPage(columnSpec, dataType, pageSize); + return createFixLengthPage(columnSpec, dataType, pageSize, compressorName); } } public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) + int pageSize, String compressor) throws MemoryException { - return newPage(columnSpec, dataType, pageSize); + return newPage(columnSpec, dataType, pageSize, compressor); } public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, LocalDictionaryGenerator localDictionaryGenerator, - boolean isComplexTypePrimitive) throws MemoryException { + boolean isComplexTypePrimitive, String compressorName) throws MemoryException { if (unsafe) { - return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, - CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, + pageSize, CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, compressorName); } else { - return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = + new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, --- End diff -- it is the same with 178 line --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213942049 --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java --- @@ -357,8 +357,8 @@ private static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumn columnSchema.write(dataOutput); } byte[] byteArray = stream.toByteArray(); - // Compress with snappy to reduce the size of schema - return CompressorFactory.getInstance().getCompressor().compressByte(byteArray); + // Compress to reduce the size of schema + return CompressorFactory.getInstance().getCompressor("snappy").compressByte(byteArray); --- End diff -- SupportedCompressor.SNAPPY.getName --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213937956 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -94,222 +98,240 @@ public void setStatsCollector(ColumnPageStatsCollector statsCollector) { } private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new SafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec, - DataType dataType, int pageSize, int eachValueSize) { + DataType dataType, int pageSize, int eachValueSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize, + compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (DataTypes.isDecimal(dataType)) { - return createDecimalPage(columnSpec, dataType, pageSize); + return createDecimalPage(columnSpec, dataType, pageSize, compressorName); } else if (dataType.equals(BYTE_ARRAY)) { - return createVarLengthPage(columnSpec, dataType, pageSize); + return createVarLengthPage(columnSpec, dataType, pageSize, compressorName); } else { - return createFixLengthPage(columnSpec, dataType, pageSize); + return createFixLengthPage(columnSpec, dataType, pageSize, compressorName); } } public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) + int pageSize, String compressor) throws MemoryException { - return newPage(columnSpec, dataType, pageSize); + return newPage(columnSpec, dataType, pageSize, compressor); } public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, LocalDictionaryGenerator localDictionaryGenerator, - boolean isComplexTypePrimitive) throws MemoryException { + boolean isComplexTypePrimitive, String compressorName) throws MemoryException { if (unsafe) { - return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, - CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, + pageSize, CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, compressorName); } else { - return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = + new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, compressorName); } } /** * Create a new page of dataType and number of row = pageSize */ public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataType, --- End diff -- how about to change the first parameter to ColumnPageEncoderMeta instead of adding an extra parameter? --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r213922415 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java --- @@ -17,42 +17,99 @@ package org.apache.carbondata.core.datastore.compression; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.format.CompressionCodec; public class CompressorFactory { - private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory(); - private final Compressor snappyCompressor; + private final Map<String, SupportedCompressor> compressors = new HashMap<>(); + + public enum SupportedCompressor { + SNAPPY(CompressionCodec.SNAPPY, "snappy", SnappyCompressor.class), + ZSTD(CompressionCodec.ZSTD, "zstd", ZstdCompressor.class); + + private CompressionCodec codec; + private String name; + private Class<Compressor> compressorClass; + private transient Compressor compressor; + + SupportedCompressor(CompressionCodec codec, String name, Class compressorCls) { + this.codec = codec; + this.name = name; + this.compressorClass = compressorCls; + } + + public CompressionCodec getCodec() { + return codec; + } + + public String getName() { + return name; + } + + /** + * we will load the compressor only if it is needed + */ + public Compressor getCompressor() { + if (this.compressor == null) { + try { + this.compressor = compressorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Exception occurs while getting compressor for " + name); + } + } + return this.compressor; + } + } private CompressorFactory() { - String compressorType = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); - switch (compressorType) { - case "snappy": - snappyCompressor = new SnappyCompressor(); - break; - default: - throw new RuntimeException( - "Invalid compressor type provided! Please provide valid compressor type"); + for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) { + compressors.put(supportedCompressor.getName(), supportedCompressor); } } public static CompressorFactory getInstance() { return COMPRESSOR_FACTORY; } + /** + * get the default compressor. + * This method can only be called in data load procedure to compress column page. + * In query procedure, we should read the compressor information from the metadata + * in datafiles when we want to decompress the content. + */ public Compressor getCompressor() { - return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + String compressorType = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); + if (!compressors.keySet().contains(compressorType)) { --- End diff -- use compressors.containsKey directly --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214560354 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java --- @@ -17,42 +17,99 @@ package org.apache.carbondata.core.datastore.compression; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.format.CompressionCodec; public class CompressorFactory { - private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory(); - private final Compressor snappyCompressor; + private final Map<String, SupportedCompressor> compressors = new HashMap<>(); + + public enum SupportedCompressor { + SNAPPY(CompressionCodec.SNAPPY, "snappy", SnappyCompressor.class), + ZSTD(CompressionCodec.ZSTD, "zstd", ZstdCompressor.class); + + private CompressionCodec codec; + private String name; + private Class<Compressor> compressorClass; + private transient Compressor compressor; + + SupportedCompressor(CompressionCodec codec, String name, Class compressorCls) { + this.codec = codec; + this.name = name; + this.compressorClass = compressorCls; + } + + public CompressionCodec getCodec() { + return codec; + } + + public String getName() { + return name; + } + + /** + * we will load the compressor only if it is needed + */ + public Compressor getCompressor() { + if (this.compressor == null) { + try { + this.compressor = compressorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Exception occurs while getting compressor for " + name); + } + } + return this.compressor; + } + } private CompressorFactory() { - String compressorType = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); - switch (compressorType) { - case "snappy": - snappyCompressor = new SnappyCompressor(); - break; - default: - throw new RuntimeException( - "Invalid compressor type provided! Please provide valid compressor type"); + for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) { + compressors.put(supportedCompressor.getName(), supportedCompressor); } } public static CompressorFactory getInstance() { return COMPRESSOR_FACTORY; } + /** + * get the default compressor. + * This method can only be called in data load procedure to compress column page. + * In query procedure, we should read the compressor information from the metadata + * in datafiles when we want to decompress the content. + */ public Compressor getCompressor() { - return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + String compressorType = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); + if (!compressors.keySet().contains(compressorType)) { --- End diff -- OK~ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214560410 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java --- @@ -17,42 +17,99 @@ package org.apache.carbondata.core.datastore.compression; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.format.CompressionCodec; public class CompressorFactory { - private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory(); - private final Compressor snappyCompressor; + private final Map<String, SupportedCompressor> compressors = new HashMap<>(); + + public enum SupportedCompressor { + SNAPPY(CompressionCodec.SNAPPY, "snappy", SnappyCompressor.class), + ZSTD(CompressionCodec.ZSTD, "zstd", ZstdCompressor.class); + + private CompressionCodec codec; + private String name; + private Class<Compressor> compressorClass; + private transient Compressor compressor; + + SupportedCompressor(CompressionCodec codec, String name, Class compressorCls) { + this.codec = codec; + this.name = name; + this.compressorClass = compressorCls; + } + + public CompressionCodec getCodec() { + return codec; + } + + public String getName() { + return name; + } + + /** + * we will load the compressor only if it is needed + */ + public Compressor getCompressor() { + if (this.compressor == null) { + try { + this.compressor = compressorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Exception occurs while getting compressor for " + name); + } + } + return this.compressor; + } + } private CompressorFactory() { - String compressorType = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); - switch (compressorType) { - case "snappy": - snappyCompressor = new SnappyCompressor(); - break; - default: - throw new RuntimeException( - "Invalid compressor type provided! Please provide valid compressor type"); + for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) { + compressors.put(supportedCompressor.getName(), supportedCompressor); } } public static CompressorFactory getInstance() { return COMPRESSOR_FACTORY; } + /** + * get the default compressor. + * This method can only be called in data load procedure to compress column page. + * In query procedure, we should read the compressor information from the metadata + * in datafiles when we want to decompress the content. + */ public Compressor getCompressor() { - return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + String compressorType = CarbonProperties.getInstance() --- End diff -- not case sensitive, internally it will be converted to use lowercase --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214560533 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java --- @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.ShortBuffer; + +import org.apache.carbondata.core.util.ByteUtil; + +import com.github.luben.zstd.Zstd; + +public class ZstdCompressor implements Compressor { + private static final int COMPRESS_LEVEL = 3; --- End diff -- It's a common parameter for almost all the compressors. The higher level it is, the bigger compress ratio will achieve while it will takes more time. Here Zstd use level 3 by default for common use. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214560605 --- Diff: core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java --- @@ -357,8 +357,8 @@ private static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumn columnSchema.write(dataOutput); } byte[] byteArray = stream.toByteArray(); - // Compress with snappy to reduce the size of schema - return CompressorFactory.getInstance().getCompressor().compressByte(byteArray); + // Compress to reduce the size of schema + return CompressorFactory.getInstance().getCompressor("snappy").compressByte(byteArray); --- End diff -- OK~ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214561432 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -94,222 +98,240 @@ public void setStatsCollector(ColumnPageStatsCollector statsCollector) { } private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new SafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec, - DataType dataType, int pageSize, int eachValueSize) { + DataType dataType, int pageSize, int eachValueSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize, + compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (DataTypes.isDecimal(dataType)) { - return createDecimalPage(columnSpec, dataType, pageSize); + return createDecimalPage(columnSpec, dataType, pageSize, compressorName); } else if (dataType.equals(BYTE_ARRAY)) { - return createVarLengthPage(columnSpec, dataType, pageSize); + return createVarLengthPage(columnSpec, dataType, pageSize, compressorName); } else { - return createFixLengthPage(columnSpec, dataType, pageSize); + return createFixLengthPage(columnSpec, dataType, pageSize, compressorName); } } public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) + int pageSize, String compressor) throws MemoryException { - return newPage(columnSpec, dataType, pageSize); + return newPage(columnSpec, dataType, pageSize, compressor); } public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, LocalDictionaryGenerator localDictionaryGenerator, - boolean isComplexTypePrimitive) throws MemoryException { + boolean isComplexTypePrimitive, String compressorName) throws MemoryException { if (unsafe) { - return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, - CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, + pageSize, CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, compressorName); } else { - return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = + new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, --- End diff -- ok, extract the code --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214583592 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -94,222 +98,240 @@ public void setStatsCollector(ColumnPageStatsCollector statsCollector) { } private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new SafeDecimalColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec, - DataType dataType, int pageSize, int eachValueSize) { + DataType dataType, int pageSize, int eachValueSize, String compressorName) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize, + compressorName); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, compressorName); } } private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + int pageSize, String compressorName) { if (DataTypes.isDecimal(dataType)) { - return createDecimalPage(columnSpec, dataType, pageSize); + return createDecimalPage(columnSpec, dataType, pageSize, compressorName); } else if (dataType.equals(BYTE_ARRAY)) { - return createVarLengthPage(columnSpec, dataType, pageSize); + return createVarLengthPage(columnSpec, dataType, pageSize, compressorName); } else { - return createFixLengthPage(columnSpec, dataType, pageSize); + return createFixLengthPage(columnSpec, dataType, pageSize, compressorName); } } public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) + int pageSize, String compressor) throws MemoryException { - return newPage(columnSpec, dataType, pageSize); + return newPage(columnSpec, dataType, pageSize, compressor); } public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, LocalDictionaryGenerator localDictionaryGenerator, - boolean isComplexTypePrimitive) throws MemoryException { + boolean isComplexTypePrimitive, String compressorName) throws MemoryException { if (unsafe) { - return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, - CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, + pageSize, CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, compressorName); } else { - return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), - localDictionaryGenerator, isComplexTypePrimitive); + ColumnPage actualPage = + new SafeVarLengthColumnPage(columnSpec, dataType, pageSize, compressorName); + ColumnPage encodedPage = + new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, compressorName); + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, compressorName); } } /** * Create a new page of dataType and number of row = pageSize */ public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataType, --- End diff -- OK, fixed --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2628#discussion_r214583607 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -66,15 +66,19 @@ protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)); + // compressor to be used for this column + protected String columnCompressorName; --- End diff -- refactored~ --- |
Free forum by Nabble | Edit this page |