Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185235683 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java --- @@ -36,25 +41,39 @@ @InterfaceStability.Evolving public abstract class DataMapWriter { - protected AbsoluteTableIdentifier identifier; + protected String tablePath; protected String segmentId; - protected String writeDirectoryPath; + protected String dataMapPath; - public DataMapWriter(AbsoluteTableIdentifier identifier, Segment segment, - String writeDirectoryPath) { - this.identifier = identifier; + private List<CarbonColumn> indexColumns; + + public DataMapWriter(CarbonTable carbonTable, DataMapSchema dataMapSchema, Segment segment, + String shardName) throws MalformedDataMapCommandException { + this(carbonTable.getTablePath(), dataMapSchema.getDataMapName(), + carbonTable.getIndexedColumns(dataMapSchema), segment, shardName); + } + + public DataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, + Segment segment, String shardName) { + this.tablePath = tablePath; this.segmentId = segment.getSegmentNo(); - this.writeDirectoryPath = writeDirectoryPath; + this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName( + tablePath, segmentId, dataMapName, shardName); + this.indexColumns = indexColumns; + } + + protected final List<CarbonColumn> getIndexColumns() { + return indexColumns; } /** * Start of new block notification. * * @param blockId file name of the carbondata file */ - public abstract void onBlockStart(String blockId, String indexShardName) throws IOException; + public abstract void onBlockStart(String blockId) throws IOException; --- End diff -- Why not define an exception to wrap the exceptions during writing datamap? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185238634 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java --- @@ -21,142 +21,124 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonUtil; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; /** - * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is - * constructed to indicate whether a value belongs to this blocklet. Bloom filter of blocklet that - * belongs to same block will be written to one index file suffixed with .bloomindex. So the number + * BloomDataMap is constructed in CG level (blocklet level). + * For each indexed column, a bloom filter is constructed to indicate whether a value + * belongs to this blocklet. Bloom filter of blocklet that belongs to same block will + * be written to one index file suffixed with .bloomindex. So the number * of bloom index file will be equal to that of the blocks. */ @InterfaceAudience.Internal public class BloomDataMapWriter extends DataMapWriter { - private String dataMapName; - private List<String> indexedColumns; + private static final LogService LOG = LogServiceFactory.getLogService( + BloomDataMapWriter.class.getCanonicalName()); private int bloomFilterSize; - // map column name to ordinal in pages - private Map<String, Integer> col2Ordianl; - private Map<String, DataType> col2DataType; - private String indexShardName; - private int currentBlockletId; + protected int currentBlockletId; private List<String> currentDMFiles; private List<DataOutputStream> currentDataOutStreams; private List<ObjectOutputStream> currentObjectOutStreams; private List<BloomFilter<byte[]>> indexBloomFilters; - @InterfaceAudience.Internal - public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta dataMapMeta, - int bloomFilterSize, Segment segment, String writeDirectoryPath) { - super(identifier, segment, writeDirectoryPath); - dataMapName = dataMapMeta.getDataMapName(); - indexedColumns = dataMapMeta.getIndexedColumns(); + BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, + Segment segment, String shardName, int bloomFilterSize) throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); this.bloomFilterSize = bloomFilterSize; - col2Ordianl = new HashMap<String, Integer>(indexedColumns.size()); - col2DataType = new HashMap<String, DataType>(indexedColumns.size()); - currentDMFiles = new ArrayList<String>(indexedColumns.size()); - currentDataOutStreams = new ArrayList<DataOutputStream>(indexedColumns.size()); - currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexedColumns.size()); - - indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexedColumns.size()); + currentDMFiles = new ArrayList<String>(indexColumns.size()); + currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size()); + currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexColumns.size()); + indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexColumns.size()); + initDataMapFile(); + resetBloomFilters(); } @Override - public void onBlockStart(String blockId, String indexShardName) throws IOException { - if (this.indexShardName == null) { - this.indexShardName = indexShardName; - initDataMapFile(); - } + public void onBlockStart(String blockId) throws IOException { } @Override public void onBlockEnd(String blockId) throws IOException { - } @Override public void onBlockletStart(int blockletId) { - this.currentBlockletId = blockletId; + } + + protected void resetBloomFilters() { indexBloomFilters.clear(); - for (int i = 0; i < indexedColumns.size(); i++) { + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int i = 0; i < indexColumns.size(); i++) { indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(), bloomFilterSize, 0.00001d)); } } @Override public void onBlockletEnd(int blockletId) { - try { - writeBloomDataMapFile(); - } catch (Exception e) { - for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) { - CarbonUtil.closeStreams(objectOutputStream); - } - for (DataOutputStream dataOutputStream : currentDataOutStreams) { - CarbonUtil.closeStreams(dataOutputStream); - } - throw new RuntimeException(e); - } + writeBloomDataMapFile(); + currentBlockletId++; } - // notice that the input pages only contains the indexed columns @Override - public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) - throws IOException { - col2Ordianl.clear(); - col2DataType.clear(); - for (int colId = 0; colId < pages.length; colId++) { - String columnName = pages[colId].getColumnSpec().getFieldName().toLowerCase(); - col2Ordianl.put(columnName, colId); - DataType columnType = pages[colId].getColumnSpec().getSchemaDataType(); - col2DataType.put(columnName, columnType); - } + public void addRow(int blockletId, int pageId, int rowId, CarbonRow row) { + addRow(row.getData()); + } - // for each row - for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) { - // for each indexed column - for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - String indexedCol = indexedColumns.get(indexColId); - byte[] indexValue; - if (DataTypes.STRING == col2DataType.get(indexedCol) - || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) { - byte[] originValue = (byte[]) pages[col2Ordianl.get(indexedCol)].getData(rowId); - indexValue = new byte[originValue.length - 2]; - System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2); - } else { - Object originValue = pages[col2Ordianl.get(indexedCol)].getData(rowId); - indexValue = CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue); - } - - indexBloomFilters.get(indexColId).put(indexValue); + protected void addRow(Object[] rowData) { + // for each indexed column, add the data to bloom filter + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int i = 0; i < indexColumns.size(); i++) { + DataType dataType = indexColumns.get(i).getDataType(); + byte[] indexValue; + if (DataTypes.STRING == dataType) { --- End diff -- The procedure for string and byte_array are the same, there is no need to distinguish them. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2254 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4658/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185394341 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -27,74 +29,124 @@ import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.events.Event; +import org.apache.commons.lang.StringUtils; + /** - * Interface for datamap factory, it is responsible for creating the datamap. + * Interface for datamap of index type, it is responsible for creating the datamap. */ -public interface DataMapFactory<T extends DataMap> { +public abstract class DataMapFactory<T extends DataMap> { + + public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + protected CarbonTable carbonTable; + + public DataMapFactory(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } /** * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + public abstract void init(DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException; /** * Return a new write for this datamap */ - DataMapWriter createWriter(Segment segment, String writeDirectoryPath); + public abstract DataMapWriter createWriter(Segment segment, String shardName) + throws IOException; + + public abstract DataMapRefresher createRefresher(Segment segment, String shardName) + throws IOException; /** * Get the datamap for segmentid */ - List<T> getDataMaps(Segment segment) throws IOException; + public abstract List<T> getDataMaps(Segment segment) throws IOException; /** * Get datamaps for distributable object. */ - List<T> getDataMaps(DataMapDistributable distributable) + public abstract List<T> getDataMaps(DataMapDistributable distributable) throws IOException; /** * Get all distributable objects of a segmentid * @return */ - List<DataMapDistributable> toDistributable(Segment segment); + public abstract List<DataMapDistributable> toDistributable(Segment segment); /** * * @param event */ - void fireEvent(Event event); + public abstract void fireEvent(Event event); /** * Clears datamap of the segment */ - void clear(Segment segment); + public abstract void clear(Segment segment); /** * Clear all datamaps from memory */ - void clear(); + public abstract void clear(); /** * Return metadata of this datamap */ - DataMapMeta getMeta(); + public abstract DataMapMeta getMeta(); /** * Type of datamap whether it is FG or CG */ - DataMapLevel getDataMapType(); + public abstract DataMapLevel getDataMapLevel(); /** * delete datamap data if any */ - void deleteDatamapData(); + public abstract void deleteDatamapData(); /** * This function should return true is the input operation enum will make the datamap become stale */ - boolean willBecomeStale(TableOperation operation); + public abstract boolean willBecomeStale(TableOperation operation); + + /** + * Validate INDEX_COLUMNS property and return a array containing index column name + * Following will be validated + * 1. require INDEX_COLUMNS property + * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank) + * 3. INDEX_COLUMNS can't contains duplicate same columns + * 4. INDEX_COLUMNS should be exists in table columns + */ + public void validateIndexedColumns(DataMapSchema dataMapSchema, + CarbonTable carbonTable) throws MalformedDataMapCommandException { + List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema); + Set<String> unique = new HashSet<>(); + for (CarbonColumn indexColumn : indexColumns) { + unique.add(indexColumn.getColName()); + } + if (unique.size() != indexColumns.size()) { + throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column"); + } + } + + public static String[] getIndexColumns(DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + String columns = dataMapSchema.getProperties().get(INDEX_COLUMNS); + if (columns == null) { + columns = dataMapSchema.getProperties().get(INDEX_COLUMNS.toLowerCase()); + } + if (columns == null) { + throw new MalformedDataMapCommandException(INDEX_COLUMNS + " DMPROPERTY is required"); + } else if (StringUtils.isBlank(columns)) { --- End diff -- yes --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185394494 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -27,74 +29,124 @@ import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.events.Event; +import org.apache.commons.lang.StringUtils; + /** - * Interface for datamap factory, it is responsible for creating the datamap. + * Interface for datamap of index type, it is responsible for creating the datamap. */ -public interface DataMapFactory<T extends DataMap> { +public abstract class DataMapFactory<T extends DataMap> { + + public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + protected CarbonTable carbonTable; + + public DataMapFactory(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } /** * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + public abstract void init(DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException; /** * Return a new write for this datamap */ - DataMapWriter createWriter(Segment segment, String writeDirectoryPath); + public abstract DataMapWriter createWriter(Segment segment, String shardName) + throws IOException; + + public abstract DataMapRefresher createRefresher(Segment segment, String shardName) + throws IOException; /** * Get the datamap for segmentid */ - List<T> getDataMaps(Segment segment) throws IOException; + public abstract List<T> getDataMaps(Segment segment) throws IOException; /** * Get datamaps for distributable object. */ - List<T> getDataMaps(DataMapDistributable distributable) + public abstract List<T> getDataMaps(DataMapDistributable distributable) throws IOException; /** * Get all distributable objects of a segmentid * @return */ - List<DataMapDistributable> toDistributable(Segment segment); + public abstract List<DataMapDistributable> toDistributable(Segment segment); /** * * @param event */ - void fireEvent(Event event); + public abstract void fireEvent(Event event); /** * Clears datamap of the segment */ - void clear(Segment segment); + public abstract void clear(Segment segment); /** * Clear all datamaps from memory */ - void clear(); + public abstract void clear(); /** * Return metadata of this datamap */ - DataMapMeta getMeta(); + public abstract DataMapMeta getMeta(); /** * Type of datamap whether it is FG or CG */ - DataMapLevel getDataMapType(); + public abstract DataMapLevel getDataMapLevel(); /** * delete datamap data if any */ - void deleteDatamapData(); + public abstract void deleteDatamapData(); /** * This function should return true is the input operation enum will make the datamap become stale */ - boolean willBecomeStale(TableOperation operation); + public abstract boolean willBecomeStale(TableOperation operation); + + /** + * Validate INDEX_COLUMNS property and return a array containing index column name + * Following will be validated + * 1. require INDEX_COLUMNS property + * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank) + * 3. INDEX_COLUMNS can't contains duplicate same columns + * 4. INDEX_COLUMNS should be exists in table columns + */ + public void validateIndexedColumns(DataMapSchema dataMapSchema, --- End diff -- I assume you mean `private`, but it is needed to access in IndexDataMapProvider and lucene datamap factory --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185394539 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -27,74 +29,124 @@ import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.events.Event; +import org.apache.commons.lang.StringUtils; + /** - * Interface for datamap factory, it is responsible for creating the datamap. + * Interface for datamap of index type, it is responsible for creating the datamap. */ -public interface DataMapFactory<T extends DataMap> { +public abstract class DataMapFactory<T extends DataMap> { + + public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + protected CarbonTable carbonTable; + + public DataMapFactory(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } /** * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + public abstract void init(DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException; /** * Return a new write for this datamap */ - DataMapWriter createWriter(Segment segment, String writeDirectoryPath); + public abstract DataMapWriter createWriter(Segment segment, String shardName) + throws IOException; + + public abstract DataMapRefresher createRefresher(Segment segment, String shardName) + throws IOException; /** * Get the datamap for segmentid */ - List<T> getDataMaps(Segment segment) throws IOException; + public abstract List<T> getDataMaps(Segment segment) throws IOException; /** * Get datamaps for distributable object. */ - List<T> getDataMaps(DataMapDistributable distributable) + public abstract List<T> getDataMaps(DataMapDistributable distributable) throws IOException; /** * Get all distributable objects of a segmentid * @return */ - List<DataMapDistributable> toDistributable(Segment segment); + public abstract List<DataMapDistributable> toDistributable(Segment segment); /** * * @param event */ - void fireEvent(Event event); + public abstract void fireEvent(Event event); /** * Clears datamap of the segment */ - void clear(Segment segment); + public abstract void clear(Segment segment); /** * Clear all datamaps from memory */ - void clear(); + public abstract void clear(); /** * Return metadata of this datamap */ - DataMapMeta getMeta(); + public abstract DataMapMeta getMeta(); /** * Type of datamap whether it is FG or CG */ - DataMapLevel getDataMapType(); + public abstract DataMapLevel getDataMapLevel(); /** * delete datamap data if any */ - void deleteDatamapData(); + public abstract void deleteDatamapData(); /** * This function should return true is the input operation enum will make the datamap become stale */ - boolean willBecomeStale(TableOperation operation); + public abstract boolean willBecomeStale(TableOperation operation); + + /** + * Validate INDEX_COLUMNS property and return a array containing index column name + * Following will be validated + * 1. require INDEX_COLUMNS property + * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank) + * 3. INDEX_COLUMNS can't contains duplicate same columns + * 4. INDEX_COLUMNS should be exists in table columns + */ + public void validateIndexedColumns(DataMapSchema dataMapSchema, + CarbonTable carbonTable) throws MalformedDataMapCommandException { + List<CarbonColumn> indexColumns = carbonTable.getIndexedColumns(dataMapSchema); + Set<String> unique = new HashSet<>(); + for (CarbonColumn indexColumn : indexColumns) { + unique.add(indexColumn.getColName()); + } + if (unique.size() != indexColumns.size()) { + throw new MalformedDataMapCommandException(INDEX_COLUMNS + " has duplicate column"); + } + } + + public static String[] getIndexColumns(DataMapSchema dataMapSchema) --- End diff -- I assume you mean private, but it is needed to access in create datamap command --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2254 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4411/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2254 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5572/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185401780 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapRegistry.java --- @@ -45,14 +51,40 @@ public class DataMapRegistry { private static Map<String, String> shortNameToClassName = new ConcurrentHashMap<>(); - public static void registerDataMap(String datamapClassName, String shortName) { + private static void registerDataMap(String datamapClassName, String shortName) { Objects.requireNonNull(datamapClassName); Objects.requireNonNull(shortName); shortNameToClassName.put(shortName, datamapClassName); } - public static String getDataMapClassName(String shortName) { + private static String getDataMapClassName(String shortName) { Objects.requireNonNull(shortName); return shortNameToClassName.get(shortName); } + + public static DataMapFactory<? extends DataMap> getDataMapByShortName( --- End diff -- The method name should be `getDataMapFactoryByShortName` --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185402531 --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java --- @@ -62,25 +63,31 @@ public void registerAllWriter(CarbonTable carbonTable, String segmentId, if (tableIndices != null) { for (TableDataMap tableDataMap : tableIndices) { DataMapFactory factory = tableDataMap.getDataMapFactory(); - register(factory, segmentId, dataWritePath); + register(factory, segmentId, taskNo); } } } /** * Register a DataMapWriter */ - private void register(DataMapFactory factory, String segmentId, String dataWritePath) { + private void register(DataMapFactory factory, String segmentId, String taskNo) { assert (factory != null); assert (segmentId != null); DataMapMeta meta = factory.getMeta(); if (meta == null) { // if data map does not have meta, no need to register return; } - List<String> columns = factory.getMeta().getIndexedColumns(); + List<CarbonColumn> columns = factory.getMeta().getIndexedColumns(); List<DataMapWriter> writers = registry.get(columns); - DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath); + DataMapWriter writer = null; + try { + writer = factory.createWriter(new Segment(segmentId), taskNo); + } catch (IOException e) { + LOG.error(e); --- End diff -- Better to optimize the error message. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185403032 --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java --- @@ -130,16 +137,26 @@ public void onBlockletEnd(int blockletId) { * @param tablePage page data */ public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException { - Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet(); - for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) { - List<String> indexedColumns = entry.getKey(); + Set<Map.Entry<List<CarbonColumn>, List<DataMapWriter>>> entries = registry.entrySet(); + for (Map.Entry<List<CarbonColumn>, List<DataMapWriter>> entry : entries) { + List<CarbonColumn> indexedColumns = entry.getKey(); ColumnPage[] pages = new ColumnPage[indexedColumns.size()]; for (int i = 0; i < indexedColumns.size(); i++) { - pages[i] = tablePage.getColumnPage(indexedColumns.get(i)); + pages[i] = tablePage.getColumnPage(indexedColumns.get(i).getColName()); } List<DataMapWriter> writers = entry.getValue(); - for (DataMapWriter writer : writers) { - writer.onPageAdded(blockletId, pageId, pages); + int pageSize = pages[0].getPageSize(); + + // add every row in the page to writer + for (int rowId = 0; rowId < pageSize; rowId++) { + Object[] rowData = new Object[indexedColumns.size()]; + for (int k = 0; k < rowData.length; k++) { + rowData[k] = pages[k].getData(rowId); + } + CarbonRow row = new CarbonRow(rowData); --- End diff -- For Line152/156, why can‘t we use the row in the pages directly? If it's not, is there a better way to eliminate the generating of objects? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185400830 --- Diff: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java --- @@ -494,7 +524,7 @@ public static long getTaskIdFromTaskNo(String taskNo) { } /** - * Return the batch number from taskNo string + * Return the batch number from taskNo stringx --- End diff -- `stringx`? Misstyping? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185402546 --- Diff: processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java --- @@ -62,25 +63,31 @@ public void registerAllWriter(CarbonTable carbonTable, String segmentId, if (tableIndices != null) { for (TableDataMap tableDataMap : tableIndices) { DataMapFactory factory = tableDataMap.getDataMapFactory(); - register(factory, segmentId, dataWritePath); + register(factory, segmentId, taskNo); } } } /** * Register a DataMapWriter */ - private void register(DataMapFactory factory, String segmentId, String dataWritePath) { + private void register(DataMapFactory factory, String segmentId, String taskNo) { assert (factory != null); assert (segmentId != null); DataMapMeta meta = factory.getMeta(); if (meta == null) { // if data map does not have meta, no need to register return; } - List<String> columns = factory.getMeta().getIndexedColumns(); + List<CarbonColumn> columns = factory.getMeta().getIndexedColumns(); List<DataMapWriter> writers = registry.get(columns); - DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath); + DataMapWriter writer = null; + try { + writer = factory.createWriter(new Segment(segmentId), taskNo); + } catch (IOException e) { + LOG.error(e); + throw new RuntimeException(e); --- End diff -- As I mentioned above, will we customize an exception that extends RTE to wrap the exceptions may be thrown during DataMapWriter? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185400353 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -27,74 +29,124 @@ import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.events.Event; +import org.apache.commons.lang.StringUtils; + /** - * Interface for datamap factory, it is responsible for creating the datamap. + * Interface for datamap of index type, it is responsible for creating the datamap. */ -public interface DataMapFactory<T extends DataMap> { +public abstract class DataMapFactory<T extends DataMap> { + + public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + protected CarbonTable carbonTable; + + public DataMapFactory(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } /** * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + public abstract void init(DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException; /** * Return a new write for this datamap */ - DataMapWriter createWriter(Segment segment, String writeDirectoryPath); + public abstract DataMapWriter createWriter(Segment segment, String shardName) + throws IOException; + + public abstract DataMapRefresher createRefresher(Segment segment, String shardName) + throws IOException; /** * Get the datamap for segmentid */ - List<T> getDataMaps(Segment segment) throws IOException; + public abstract List<T> getDataMaps(Segment segment) throws IOException; /** * Get datamaps for distributable object. */ - List<T> getDataMaps(DataMapDistributable distributable) + public abstract List<T> getDataMaps(DataMapDistributable distributable) throws IOException; /** * Get all distributable objects of a segmentid * @return */ - List<DataMapDistributable> toDistributable(Segment segment); + public abstract List<DataMapDistributable> toDistributable(Segment segment); /** * * @param event */ - void fireEvent(Event event); + public abstract void fireEvent(Event event); /** * Clears datamap of the segment */ - void clear(Segment segment); + public abstract void clear(Segment segment); /** * Clear all datamaps from memory */ - void clear(); + public abstract void clear(); /** * Return metadata of this datamap */ - DataMapMeta getMeta(); + public abstract DataMapMeta getMeta(); /** * Type of datamap whether it is FG or CG */ - DataMapLevel getDataMapType(); + public abstract DataMapLevel getDataMapLevel(); /** * delete datamap data if any */ - void deleteDatamapData(); + public abstract void deleteDatamapData(); /** * This function should return true is the input operation enum will make the datamap become stale */ - boolean willBecomeStale(TableOperation operation); + public abstract boolean willBecomeStale(TableOperation operation); + + /** + * Validate INDEX_COLUMNS property and return a array containing index column name + * Following will be validated + * 1. require INDEX_COLUMNS property + * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank) + * 3. INDEX_COLUMNS can't contains duplicate same columns + * 4. INDEX_COLUMNS should be exists in table columns + */ + public void validateIndexedColumns(DataMapSchema dataMapSchema, --- End diff -- Oh, I thought it can be `protected` at that time. Actually these two methods do not use any variables in this class, so I think it's not proper for them to be here. Or just make them static? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2254 @jackylk Pls check the review comments. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2254 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4663/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2254 Inspired by the optimization of compaction (#2210), incremental building of datamap can also be optimized. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185427135 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -27,74 +29,124 @@ import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.events.Event; +import org.apache.commons.lang.StringUtils; + /** - * Interface for datamap factory, it is responsible for creating the datamap. + * Interface for datamap of index type, it is responsible for creating the datamap. */ -public interface DataMapFactory<T extends DataMap> { +public abstract class DataMapFactory<T extends DataMap> { + + public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + protected CarbonTable carbonTable; + + public DataMapFactory(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } /** * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + public abstract void init(DataMapSchema dataMapSchema) --- End diff -- Why init is required for abstract class, Better pass schema also to the constructor and remove this method. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185428212 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -27,74 +29,124 @@ import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.events.Event; +import org.apache.commons.lang.StringUtils; + /** - * Interface for datamap factory, it is responsible for creating the datamap. + * Interface for datamap of index type, it is responsible for creating the datamap. */ -public interface DataMapFactory<T extends DataMap> { +public abstract class DataMapFactory<T extends DataMap> { + + public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + protected CarbonTable carbonTable; + + public DataMapFactory(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } /** * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + public abstract void init(DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException; /** * Return a new write for this datamap */ - DataMapWriter createWriter(Segment segment, String writeDirectoryPath); + public abstract DataMapWriter createWriter(Segment segment, String shardName) + throws IOException; + + public abstract DataMapRefresher createRefresher(Segment segment, String shardName) + throws IOException; /** * Get the datamap for segmentid */ - List<T> getDataMaps(Segment segment) throws IOException; + public abstract List<T> getDataMaps(Segment segment) throws IOException; /** * Get datamaps for distributable object. */ - List<T> getDataMaps(DataMapDistributable distributable) + public abstract List<T> getDataMaps(DataMapDistributable distributable) throws IOException; /** * Get all distributable objects of a segmentid * @return */ - List<DataMapDistributable> toDistributable(Segment segment); + public abstract List<DataMapDistributable> toDistributable(Segment segment); /** * * @param event */ - void fireEvent(Event event); + public abstract void fireEvent(Event event); /** * Clears datamap of the segment */ - void clear(Segment segment); + public abstract void clear(Segment segment); /** * Clear all datamaps from memory */ - void clear(); + public abstract void clear(); /** * Return metadata of this datamap */ - DataMapMeta getMeta(); + public abstract DataMapMeta getMeta(); /** * Type of datamap whether it is FG or CG */ - DataMapLevel getDataMapType(); + public abstract DataMapLevel getDataMapLevel(); /** * delete datamap data if any */ - void deleteDatamapData(); + public abstract void deleteDatamapData(); /** * This function should return true is the input operation enum will make the datamap become stale */ - boolean willBecomeStale(TableOperation operation); + public abstract boolean willBecomeStale(TableOperation operation); + + /** + * Validate INDEX_COLUMNS property and return a array containing index column name + * Following will be validated + * 1. require INDEX_COLUMNS property + * 2. INDEX_COLUMNS can't contains illegal argument(empty, blank) + * 3. INDEX_COLUMNS can't contains duplicate same columns + * 4. INDEX_COLUMNS should be exists in table columns + */ + public void validateIndexedColumns(DataMapSchema dataMapSchema, --- End diff -- I think it is not required to pass any arguments to it, both schema and table are already present here. And better rename the method to just `validate` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185429022 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java --- @@ -291,8 +295,8 @@ private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpre } boolean contains = true; for (ColumnExpression expression : columnExpressions) { - if (!mapMeta.getIndexedColumns().contains(expression.getColumnName()) || !mapMeta - .getOptimizedOperation().containsAll(expressionTypes)) { + if (!mapMeta.getIndexedColumnNames().contains(expression.getColumnName()) || + !mapMeta.getOptimizedOperation().containsAll(expressionTypes)) { --- End diff -- expressionTypes is from the FilterResolverIntf. So it is from the expression of the user query. See selectDataMap function --- |
Free forum by Nabble | Edit this page |