Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769082 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java --- @@ -114,108 +118,36 @@ public void initialize() throws IOException { indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); } - private IndexWriter createPageIndexWriter() throws IOException { - // save index data into ram, write into disk after one page finished - RAMDirectory ramDir = new RAMDirectory(); - return new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); - } - - private void addPageIndex(IndexWriter pageIndexWriter) throws IOException { - - Directory directory = pageIndexWriter.getDirectory(); - - // close ram writer - pageIndexWriter.close(); - - // add ram index data into disk - indexWriter.addIndexes(directory); - - // delete this ram data - directory.close(); - } - - @Override - public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException { - if (rowId == 0) { - if (pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } - pageIndexWriter = createPageIndexWriter(); - } - - // create a new document - Document doc = new Document(); - - // add blocklet Id - doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - - // add page id - doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - - // add row id - doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId)); - doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId)); + @Override public void addRow(int blockletId, int pageId, int rowId, Object[] values) + throws IOException { // add other fields + LuceneDataMapWriter.LuceneColumnKeys columns = + new LuceneDataMapWriter.LuceneColumnKeys(columnsCount); for (int colIdx = 0; colIdx < columnsCount; colIdx++) { - CarbonColumn column = indexColumns.get(colIdx); - addField(doc, column.getColName(), column.getDataType(), values[colIdx]); + columns.getColValues()[colIdx] = values[colIdx]; + } + if (writeCacheSize > 0) { + addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise); + flushCacheIfCan(); + } else { + addData(columns, rowId, pageId, blockletId, intBuffer, indexWriter, indexColumns, + storeBlockletWise); } - pageIndexWriter.addDocument(doc); } - private boolean addField(Document doc, String fieldName, DataType type, Object value) { - if (type == DataTypes.STRING) { - doc.add(new TextField(fieldName, (String) value, Field.Store.NO)); - } else if (type == DataTypes.BYTE) { - // byte type , use int range to deal with byte, lucene has no byte type - IntRangeField field = - new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE }); - field.setIntValue((int) value); - doc.add(field); - } else if (type == DataTypes.SHORT) { - // short type , use int range to deal with short type, lucene has no short type - IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE }, - new int[] { Short.MAX_VALUE }); - field.setShortValue((short) value); - doc.add(field); - } else if (type == DataTypes.INT) { - // int type , use int point to deal with int type - doc.add(new IntPoint(fieldName, (int) value)); - } else if (type == DataTypes.LONG) { - // long type , use long point to deal with long type - doc.add(new LongPoint(fieldName, (long) value)); - } else if (type == DataTypes.FLOAT) { - doc.add(new FloatPoint(fieldName, (float) value)); - } else if (type == DataTypes.DOUBLE) { - doc.add(new DoublePoint(fieldName, (double) value)); - } else if (type == DataTypes.DATE) { - // TODO: how to get data value - } else if (type == DataTypes.TIMESTAMP) { - // TODO: how to get - } else if (type == DataTypes.BOOLEAN) { - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue((boolean) value ? 1 : 0); - doc.add(field); - } else { - LOGGER.error("unsupport data type " + type); - throw new RuntimeException("unsupported data type " + type); + private void flushCacheIfCan() throws IOException { + if (cache.size() > writeCacheSize) { + flushCache(cache, indexColumns, indexWriter, storeBlockletWise); } - return true; } - @Override - public void finish() throws IOException { - if (indexWriter != null && pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } + @Override public void finish() throws IOException { --- End diff -- move override to the previous ine --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769148 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -175,52 +205,38 @@ public void onBlockletEnd(int blockletId) throws IOException { */ public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) throws IOException { + // save index data into ram, write into disk after one page finished + int columnsCount = pages.length; + if (columnsCount <= 0) { + LOGGER.warn("empty data"); --- End diff -- Log is too simple for user to find problems in real env. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769085 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java --- @@ -114,108 +118,36 @@ public void initialize() throws IOException { indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); } - private IndexWriter createPageIndexWriter() throws IOException { - // save index data into ram, write into disk after one page finished - RAMDirectory ramDir = new RAMDirectory(); - return new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); - } - - private void addPageIndex(IndexWriter pageIndexWriter) throws IOException { - - Directory directory = pageIndexWriter.getDirectory(); - - // close ram writer - pageIndexWriter.close(); - - // add ram index data into disk - indexWriter.addIndexes(directory); - - // delete this ram data - directory.close(); - } - - @Override - public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException { - if (rowId == 0) { - if (pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } - pageIndexWriter = createPageIndexWriter(); - } - - // create a new document - Document doc = new Document(); - - // add blocklet Id - doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - - // add page id - doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - - // add row id - doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId)); - doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId)); + @Override public void addRow(int blockletId, int pageId, int rowId, Object[] values) + throws IOException { // add other fields + LuceneDataMapWriter.LuceneColumnKeys columns = + new LuceneDataMapWriter.LuceneColumnKeys(columnsCount); for (int colIdx = 0; colIdx < columnsCount; colIdx++) { - CarbonColumn column = indexColumns.get(colIdx); - addField(doc, column.getColName(), column.getDataType(), values[colIdx]); + columns.getColValues()[colIdx] = values[colIdx]; + } + if (writeCacheSize > 0) { + addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise); + flushCacheIfCan(); + } else { + addData(columns, rowId, pageId, blockletId, intBuffer, indexWriter, indexColumns, + storeBlockletWise); } - pageIndexWriter.addDocument(doc); } - private boolean addField(Document doc, String fieldName, DataType type, Object value) { - if (type == DataTypes.STRING) { - doc.add(new TextField(fieldName, (String) value, Field.Store.NO)); - } else if (type == DataTypes.BYTE) { - // byte type , use int range to deal with byte, lucene has no byte type - IntRangeField field = - new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE }); - field.setIntValue((int) value); - doc.add(field); - } else if (type == DataTypes.SHORT) { - // short type , use int range to deal with short type, lucene has no short type - IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE }, - new int[] { Short.MAX_VALUE }); - field.setShortValue((short) value); - doc.add(field); - } else if (type == DataTypes.INT) { - // int type , use int point to deal with int type - doc.add(new IntPoint(fieldName, (int) value)); - } else if (type == DataTypes.LONG) { - // long type , use long point to deal with long type - doc.add(new LongPoint(fieldName, (long) value)); - } else if (type == DataTypes.FLOAT) { - doc.add(new FloatPoint(fieldName, (float) value)); - } else if (type == DataTypes.DOUBLE) { - doc.add(new DoublePoint(fieldName, (double) value)); - } else if (type == DataTypes.DATE) { - // TODO: how to get data value - } else if (type == DataTypes.TIMESTAMP) { - // TODO: how to get - } else if (type == DataTypes.BOOLEAN) { - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue((boolean) value ? 1 : 0); - doc.add(field); - } else { - LOGGER.error("unsupport data type " + type); - throw new RuntimeException("unsupported data type " + type); + private void flushCacheIfCan() throws IOException { + if (cache.size() > writeCacheSize) { + flushCache(cache, indexColumns, indexWriter, storeBlockletWise); } - return true; } - @Override - public void finish() throws IOException { - if (indexWriter != null && pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } + @Override public void finish() throws IOException { + flushCache(cache, indexColumns, indexWriter, storeBlockletWise); } - @Override - public void close() throws IOException { + @Override public void close() throws IOException { --- End diff -- move override to the previous ine --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769052 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java --- @@ -66,20 +62,28 @@ private IndexWriter indexWriter = null; - private IndexWriter pageIndexWriter = null; - private Analyzer analyzer = null; - LuceneDataMapRefresher(String tablePath, String dataMapName, - Segment segment, String shardName, List<CarbonColumn> indexColumns) { - this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName( - tablePath, segment.getSegmentNo(), dataMapName, shardName); + private int writeCacheSize; + + private Map<LuceneDataMapWriter.LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache = + new HashMap<>(); + + private ByteBuffer intBuffer = ByteBuffer.allocate(4); + + private boolean storeBlockletWise; + + LuceneDataMapRefresher(String tablePath, String dataMapName, Segment segment, String shardName, + List<CarbonColumn> indexColumns, int writeCacheSize, boolean storeBlockletWise) { + this.dataMapPath = CarbonTablePath + .getDataMapStorePathOnShardName(tablePath, segment.getSegmentNo(), dataMapName, shardName); this.indexColumns = indexColumns; this.columnsCount = indexColumns.size(); + this.writeCacheSize = writeCacheSize; + this.storeBlockletWise = storeBlockletWise; } - @Override - public void initialize() throws IOException { + @Override public void initialize() throws IOException { --- End diff -- Move override to the previous line --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769190 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -242,74 +258,216 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, (int) value)); } - } else if (type == DataTypes.INT) { + } else if (key instanceof Integer) { // int type , use int point to deal with int type - int value = (int) data; - doc.add(new IntPoint(fieldName, value)); + int value = (Integer) key; + doc.add(new IntPoint(fieldName, new int[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.LONG) { + } else if (key instanceof Long) { // long type , use long point to deal with long type - long value = (long) data; - doc.add(new LongPoint(fieldName, value)); + long value = (Long) key; + doc.add(new LongPoint(fieldName, new long[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.FLOAT) { - float value = (float) data; - doc.add(new FloatPoint(fieldName, value)); + } else if (key instanceof Float) { + float value = (Float) key; + doc.add(new FloatPoint(fieldName, new float[] { value })); if (store == Field.Store.YES) { doc.add(new FloatPoint(fieldName, value)); } - } else if (type == DataTypes.DOUBLE) { - double value = (double) data; - doc.add(new DoublePoint(fieldName, value)); + } else if (key instanceof Double) { + double value = (Double) key; + doc.add(new DoublePoint(fieldName, new double[] { value })); if (store == Field.Store.YES) { doc.add(new DoublePoint(fieldName, value)); } + } else if (key instanceof String) { + String strValue = (String) key; + doc.add(new TextField(fieldName, strValue, store)); + } else if (key instanceof Boolean) { + boolean value = (Boolean) key; + IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); + field.setIntValue(value ? 1 : 0); + doc.add(field); + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value ? 1 : 0)); + } + } + return true; + } + + private Object getValue(ColumnPage page, int rowId) { + + //get field type + DataType type = page.getColumnSpec().getSchemaDataType(); + Object value = null; + if (type == DataTypes.BYTE) { + // byte type , use int range to deal with byte, lucene has no byte type + value = page.getByte(rowId); + } else if (type == DataTypes.SHORT) { + // short type , use int range to deal with short type, lucene has no short type + value = page.getShort(rowId); + } else if (type == DataTypes.INT) { + // int type , use int point to deal with int type + value = page.getInt(rowId); + } else if (type == DataTypes.LONG) { + // long type , use long point to deal with long type + value = page.getLong(rowId); + } else if (type == DataTypes.FLOAT) { + value = page.getFloat(rowId); + } else if (type == DataTypes.DOUBLE) { + value = page.getDouble(rowId); } else if (type == DataTypes.STRING) { - byte[] value = (byte[]) data; + byte[] bytes = page.getBytes(rowId); // TODO: how to get string value - String strValue = null; try { - strValue = new String(value, 2, value.length - 2, "UTF-8"); + value = new String(bytes, 2, bytes.length - 2, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } - doc.add(new TextField(fieldName, strValue, store)); } else if (type == DataTypes.DATE) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.TIMESTAMP) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.BOOLEAN) { - boolean value = (boolean) data; - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue(value ? 1 : 0); - doc.add(field); - if (store == Field.Store.YES) { - doc.add(new StoredField(fieldName, value ? 1 : 0)); - } + value = page.getBoolean(rowId); } else { LOGGER.error("unsupport data type " + type); throw new RuntimeException("unsupported data type " + type); } - return true; + return value; + } + + public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer, + boolean storeBlockletWise) { + Map<Integer, RoaringBitmap> setMap = cache.get(key); + if (setMap == null) { + setMap = new HashMap<>(); + cache.put(key, setMap); + } + int combinKey; + if (!storeBlockletWise) { + intBuffer.clear(); + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + combinKey = intBuffer.getInt(); + } else { + combinKey = pageId; + } + RoaringBitmap bitSet = setMap.get(combinKey); + if (bitSet == null) { + bitSet = new RoaringBitmap(); + setMap.put(combinKey, bitSet); + } + bitSet.add(rowId); + } + + public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols, + boolean storeBlockletWise) throws IOException { + + Document document = new Document(); + for (int i = 0; i < key.getColValues().length; i++) { + addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO); + } + intBuffer.clear(); + if (storeBlockletWise) { + // No need to store blocklet id to it. + intBuffer.putShort((short) pageId); + intBuffer.putShort((short) rowId); + intBuffer.rewind(); + document.add(new StoredField(ROWID_NAME, intBuffer.getInt())); + } else { + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + document.add(new StoredField(PAGEID_NAME, intBuffer.getInt())); + document.add(new StoredField(ROWID_NAME, (short) rowId)); + } + indexWriter.addDocument(document); + } + + private void flushCacheIfCan() throws IOException { --- End diff -- better to name it `flushCacheIfPossible` --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769065 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java --- @@ -114,108 +118,36 @@ public void initialize() throws IOException { indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); } - private IndexWriter createPageIndexWriter() throws IOException { - // save index data into ram, write into disk after one page finished - RAMDirectory ramDir = new RAMDirectory(); - return new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); - } - - private void addPageIndex(IndexWriter pageIndexWriter) throws IOException { - - Directory directory = pageIndexWriter.getDirectory(); - - // close ram writer - pageIndexWriter.close(); - - // add ram index data into disk - indexWriter.addIndexes(directory); - - // delete this ram data - directory.close(); - } - - @Override - public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException { - if (rowId == 0) { - if (pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } - pageIndexWriter = createPageIndexWriter(); - } - - // create a new document - Document doc = new Document(); - - // add blocklet Id - doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - - // add page id - doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - - // add row id - doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId)); - doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId)); + @Override public void addRow(int blockletId, int pageId, int rowId, Object[] values) --- End diff -- move override to the previous ine --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769203 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -242,74 +258,216 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, (int) value)); } - } else if (type == DataTypes.INT) { + } else if (key instanceof Integer) { // int type , use int point to deal with int type - int value = (int) data; - doc.add(new IntPoint(fieldName, value)); + int value = (Integer) key; + doc.add(new IntPoint(fieldName, new int[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.LONG) { + } else if (key instanceof Long) { // long type , use long point to deal with long type - long value = (long) data; - doc.add(new LongPoint(fieldName, value)); + long value = (Long) key; + doc.add(new LongPoint(fieldName, new long[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.FLOAT) { - float value = (float) data; - doc.add(new FloatPoint(fieldName, value)); + } else if (key instanceof Float) { + float value = (Float) key; + doc.add(new FloatPoint(fieldName, new float[] { value })); if (store == Field.Store.YES) { doc.add(new FloatPoint(fieldName, value)); } - } else if (type == DataTypes.DOUBLE) { - double value = (double) data; - doc.add(new DoublePoint(fieldName, value)); + } else if (key instanceof Double) { + double value = (Double) key; + doc.add(new DoublePoint(fieldName, new double[] { value })); if (store == Field.Store.YES) { doc.add(new DoublePoint(fieldName, value)); } + } else if (key instanceof String) { + String strValue = (String) key; + doc.add(new TextField(fieldName, strValue, store)); + } else if (key instanceof Boolean) { + boolean value = (Boolean) key; + IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); + field.setIntValue(value ? 1 : 0); + doc.add(field); + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value ? 1 : 0)); + } + } + return true; + } + + private Object getValue(ColumnPage page, int rowId) { + + //get field type + DataType type = page.getColumnSpec().getSchemaDataType(); + Object value = null; + if (type == DataTypes.BYTE) { + // byte type , use int range to deal with byte, lucene has no byte type + value = page.getByte(rowId); + } else if (type == DataTypes.SHORT) { + // short type , use int range to deal with short type, lucene has no short type + value = page.getShort(rowId); + } else if (type == DataTypes.INT) { + // int type , use int point to deal with int type + value = page.getInt(rowId); + } else if (type == DataTypes.LONG) { + // long type , use long point to deal with long type + value = page.getLong(rowId); + } else if (type == DataTypes.FLOAT) { + value = page.getFloat(rowId); + } else if (type == DataTypes.DOUBLE) { + value = page.getDouble(rowId); } else if (type == DataTypes.STRING) { - byte[] value = (byte[]) data; + byte[] bytes = page.getBytes(rowId); // TODO: how to get string value - String strValue = null; try { - strValue = new String(value, 2, value.length - 2, "UTF-8"); + value = new String(bytes, 2, bytes.length - 2, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } - doc.add(new TextField(fieldName, strValue, store)); } else if (type == DataTypes.DATE) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.TIMESTAMP) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.BOOLEAN) { - boolean value = (boolean) data; - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue(value ? 1 : 0); - doc.add(field); - if (store == Field.Store.YES) { - doc.add(new StoredField(fieldName, value ? 1 : 0)); - } + value = page.getBoolean(rowId); } else { LOGGER.error("unsupport data type " + type); throw new RuntimeException("unsupported data type " + type); } - return true; + return value; + } + + public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer, + boolean storeBlockletWise) { + Map<Integer, RoaringBitmap> setMap = cache.get(key); + if (setMap == null) { + setMap = new HashMap<>(); + cache.put(key, setMap); + } + int combinKey; + if (!storeBlockletWise) { + intBuffer.clear(); + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + combinKey = intBuffer.getInt(); + } else { + combinKey = pageId; + } + RoaringBitmap bitSet = setMap.get(combinKey); + if (bitSet == null) { + bitSet = new RoaringBitmap(); + setMap.put(combinKey, bitSet); + } + bitSet.add(rowId); + } + + public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols, + boolean storeBlockletWise) throws IOException { + + Document document = new Document(); + for (int i = 0; i < key.getColValues().length; i++) { + addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO); + } + intBuffer.clear(); + if (storeBlockletWise) { + // No need to store blocklet id to it. + intBuffer.putShort((short) pageId); + intBuffer.putShort((short) rowId); + intBuffer.rewind(); + document.add(new StoredField(ROWID_NAME, intBuffer.getInt())); + } else { + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + document.add(new StoredField(PAGEID_NAME, intBuffer.getInt())); + document.add(new StoredField(ROWID_NAME, (short) rowId)); + } + indexWriter.addDocument(document); + } + + private void flushCacheIfCan() throws IOException { + if (cache.size() > cacheSize) { + flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise); + } + } + + public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, + List<CarbonColumn> indexCols, IndexWriter indexWriter, boolean storeBlockletWise) + throws IOException { + for (Map.Entry<LuceneColumnKeys, Map<Integer, RoaringBitmap>> entry : cache.entrySet()) { + Document document = new Document(); + LuceneColumnKeys key = entry.getKey(); + for (int i = 0; i < key.getColValues().length; i++) { + addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO); + } + Map<Integer, RoaringBitmap> value = entry.getValue(); + int count = 0; + for (Map.Entry<Integer, RoaringBitmap> pageData : value.entrySet()) { + RoaringBitmap bitMap = pageData.getValue(); + int cardinality = bitMap.getCardinality(); + // Each row is short and pageid is stored in int + ByteBuffer byteBuffer = ByteBuffer.allocate(cardinality * 2 + 4); + if (!storeBlockletWise) { + byteBuffer.putInt(pageData.getKey()); + } else { + byteBuffer.putShort(pageData.getKey().shortValue()); + } + IntIterator intIterator = bitMap.getIntIterator(); + while (intIterator.hasNext()) { + byteBuffer.putShort((short) intIterator.next()); + } + document.add(new StoredField(PAGEID_NAME + count, byteBuffer.array())); + count++; + } + indexWriter.addDocument(document); + } + cache.clear(); } /** * This is called during closing of writer.So after this call no more data will be sent to this * class. */ public void finish() throws IOException { + flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise); // finished a file , close this index writer if (indexWriter != null) { indexWriter.close(); } } + public static class LuceneColumnKeys { + + private Object[] colValues; + + public LuceneColumnKeys(int size) { + colValues = new Object[size]; + } + + public Object[] getColValues() { + return colValues; + } + + @Override public boolean equals(Object o) { --- End diff -- move override to previous line, besides there are others in this class. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2275 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4913/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2275 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4722/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2275 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5876/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2275 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4987/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2275 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5961/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2275 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4803/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189419746 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java --- @@ -212,69 +228,58 @@ private int getMaxDoc(Expression expression) { LOGGER.error(errorMessage); return null; } - - // execute index search - // initialize to null, else ScoreDoc objects will get accumulated in memory - TopDocs result = null; - try { - result = indexSearcher.search(query, maxDocs); - } catch (IOException e) { - String errorMessage = - String.format("failed to search lucene data, detail is %s", e.getMessage()); - LOGGER.error(errorMessage); - throw new IOException(errorMessage); - } - // temporary data, delete duplicated data // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> - Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>(); - - for (ScoreDoc scoreDoc : result.scoreDocs) { - // get a document - Document doc = indexSearcher.doc(scoreDoc.doc); + Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>(); + + for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) { + IndexSearcher indexSearcher = searcherEntry.getValue(); + // execute index search + // initialize to null, else ScoreDoc objects will get accumulated in memory + TopDocs result = null; + try { + result = indexSearcher.search(query, maxDocs); + } catch (IOException e) { + String errorMessage = + String.format("failed to search lucene data, detail is %s", e.getMessage()); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } - // get all fields - List<IndexableField> fieldsInDoc = doc.getFields(); + ByteBuffer intBuffer = ByteBuffer.allocate(4); - // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>> - String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue(); - Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId); - if (mapPageIds == null) { - mapPageIds = new HashMap<>(); - mapBlocks.put(blockletId, mapPageIds); - } + for (ScoreDoc scoreDoc : result.scoreDocs) { + // get a document + Document doc = indexSearcher.doc(scoreDoc.doc); - // get the page id Map<PageId, Set<RowId>> - Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue(); - Set<Integer> setRowId = mapPageIds.get(pageId.intValue()); - if (setRowId == null) { - setRowId = new HashSet<>(); - mapPageIds.put(pageId.intValue(), setRowId); + // get all fields + List<IndexableField> fieldsInDoc = doc.getFields(); + if (writeCacheSize > 0) { + fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey()); --- End diff -- please add comment here to describe fillMap and fillMapDirect --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189419824 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -242,73 +260,220 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, (int) value)); } - } else if (type == DataTypes.INT) { + } else if (key instanceof Integer) { // int type , use int point to deal with int type - int value = (int) data; - doc.add(new IntPoint(fieldName, value)); + int value = (Integer) key; + doc.add(new IntPoint(fieldName, new int[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.LONG) { + } else if (key instanceof Long) { // long type , use long point to deal with long type - long value = (long) data; - doc.add(new LongPoint(fieldName, value)); + long value = (Long) key; + doc.add(new LongPoint(fieldName, new long[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.FLOAT) { - float value = (float) data; - doc.add(new FloatPoint(fieldName, value)); + } else if (key instanceof Float) { + float value = (Float) key; + doc.add(new FloatPoint(fieldName, new float[] { value })); if (store == Field.Store.YES) { doc.add(new FloatPoint(fieldName, value)); } - } else if (type == DataTypes.DOUBLE) { - double value = (double) data; - doc.add(new DoublePoint(fieldName, value)); + } else if (key instanceof Double) { + double value = (Double) key; + doc.add(new DoublePoint(fieldName, new double[] { value })); if (store == Field.Store.YES) { doc.add(new DoublePoint(fieldName, value)); } + } else if (key instanceof String) { + String strValue = (String) key; + doc.add(new TextField(fieldName, strValue, store)); + } else if (key instanceof Boolean) { + boolean value = (Boolean) key; + IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); + field.setIntValue(value ? 1 : 0); + doc.add(field); + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value ? 1 : 0)); + } + } + return true; + } + + private Object getValue(ColumnPage page, int rowId) { + + //get field type + DataType type = page.getColumnSpec().getSchemaDataType(); + Object value = null; + if (type == DataTypes.BYTE) { + // byte type , use int range to deal with byte, lucene has no byte type + value = page.getByte(rowId); + } else if (type == DataTypes.SHORT) { + // short type , use int range to deal with short type, lucene has no short type + value = page.getShort(rowId); + } else if (type == DataTypes.INT) { + // int type , use int point to deal with int type + value = page.getInt(rowId); + } else if (type == DataTypes.LONG) { + // long type , use long point to deal with long type + value = page.getLong(rowId); + } else if (type == DataTypes.FLOAT) { + value = page.getFloat(rowId); + } else if (type == DataTypes.DOUBLE) { + value = page.getDouble(rowId); } else if (type == DataTypes.STRING) { - byte[] value = (byte[]) data; - String strValue = null; + byte[] bytes = page.getBytes(rowId); try { - strValue = new String(value, 2, value.length - 2, "UTF-8"); + value = new String(bytes, 2, bytes.length - 2, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } - doc.add(new TextField(fieldName, strValue, store)); } else if (type == DataTypes.DATE) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.TIMESTAMP) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.BOOLEAN) { - boolean value = (boolean) data; - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue(value ? 1 : 0); - doc.add(field); - if (store == Field.Store.YES) { - doc.add(new StoredField(fieldName, value ? 1 : 0)); - } + value = page.getBoolean(rowId); } else { LOGGER.error("unsupport data type " + type); throw new RuntimeException("unsupported data type " + type); } - return true; + return value; + } + + public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer, + boolean storeBlockletWise) { + Map<Integer, RoaringBitmap> setMap = cache.get(key); + if (setMap == null) { + setMap = new HashMap<>(); + cache.put(key, setMap); + } + int combinKey; + if (!storeBlockletWise) { + intBuffer.clear(); + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + combinKey = intBuffer.getInt(); + } else { + combinKey = pageId; + } + RoaringBitmap bitSet = setMap.get(combinKey); + if (bitSet == null) { + bitSet = new RoaringBitmap(); + setMap.put(combinKey, bitSet); + } + bitSet.add(rowId); + } + + public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols, + boolean storeBlockletWise) throws IOException { + + Document document = new Document(); + for (int i = 0; i < key.getColValues().length; i++) { + addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO); + } + intBuffer.clear(); + if (storeBlockletWise) { + // No need to store blocklet id to it. + intBuffer.putShort((short) pageId); + intBuffer.putShort((short) rowId); + intBuffer.rewind(); + document.add(new StoredField(ROWID_NAME, intBuffer.getInt())); + } else { + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + document.add(new StoredField(PAGEID_NAME, intBuffer.getInt())); + document.add(new StoredField(ROWID_NAME, (short) rowId)); + } + indexWriter.addDocument(document); + } + + private void flushCacheIfPossible() throws IOException { + if (cache.size() > cacheSize) { + flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise); + } + } + + public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, --- End diff -- It is not required to be public --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189419987 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -84,20 +91,61 @@ public static final String ROWID_NAME = "rowId"; + private Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache = new HashMap<>(); + + private int cacheSize; + + private ByteBuffer intBuffer = ByteBuffer.allocate(4); + + private boolean storeBlockletWise; + LuceneDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, - Segment segment, String shardName, boolean isFineGrain) { + Segment segment, String shardName, boolean isFineGrain, int flushSize, + boolean storeBlockletWise) { super(tablePath, dataMapName, indexColumns, segment, shardName); this.isFineGrain = isFineGrain; --- End diff -- Seems isFineGrain, indexShardName, BLOCKLETID_NAME is not used --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189420060 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java --- @@ -107,13 +138,39 @@ public LuceneDataMapFactoryBase(CarbonTable carbonTable, DataMapSchema dataMapSc // optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO); // optimizedOperations.add(ExpressionType.NOT); optimizedOperations.add(ExpressionType.TEXT_MATCH); - this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations); - + this.dataMapMeta = new DataMapMeta(indexedCarbonColumns, optimizedOperations); // get analyzer // TODO: how to get analyzer ? analyzer = new StandardAnalyzer(); } + public static int validateAndGetWriteCacheSize(DataMapSchema schema) { + String cacheStr = schema.getProperties().get(FLUSH_CACHE); + if (cacheStr == null) { + cacheStr = FLUSH_CACHE_DEFAULT_SIZE; + } + int cacheSize; + try { + cacheSize = Integer.parseInt(cacheStr); + } catch (NumberFormatException e) { + cacheSize = -1; + } + return cacheSize; + } + + public static boolean validateAndGetStoreBlockletWise(DataMapSchema schema) { + String splitBlockletStr = schema.getProperties().get(SPLIT_BLOCKLET); + if (splitBlockletStr == null) { + splitBlockletStr = SPLIT_BLOCKLET_DEFAULT; + } + boolean splitBlockletWise; + try { + splitBlockletWise = Boolean.parseBoolean(splitBlockletStr); + } catch (NumberFormatException e) { + splitBlockletWise = false; --- End diff -- I think it is better to make default as true, to make lucene index smaller --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189420105 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -175,52 +206,39 @@ public void onBlockletEnd(int blockletId) throws IOException { */ public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) throws IOException { + // save index data into ram, write into disk after one page finished + int columnsCount = pages.length; + if (columnsCount <= 0) { + LOGGER.warn("No data in the page " + pageId + "with blockletid " + blockletId + + " to write lucene datamap"); + return; + } for (int rowId = 0; rowId < pageSize; rowId++) { - // create a new document - Document doc = new Document(); - // add blocklet Id - doc.add(new IntPoint(BLOCKLETID_NAME, blockletId)); - doc.add(new StoredField(BLOCKLETID_NAME, blockletId)); - //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId)); - - // add page id and row id in Fine Grain data map - if (isFineGrain) { - // add page Id - doc.add(new IntPoint(PAGEID_NAME, pageId)); - doc.add(new StoredField(PAGEID_NAME, pageId)); - //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId)); - - // add row id - doc.add(new IntPoint(ROWID_NAME, rowId)); - doc.add(new StoredField(ROWID_NAME, rowId)); - //doc.add(new NumericDocValuesField(ROWID_NAME,rowId)); - } - // add indexed columns value into the document - List<CarbonColumn> indexColumns = getIndexColumns(); - for (int i = 0; i < pages.length; i++) { - // add to lucene only if value is not null - if (!pages[i].getNullBits().get(rowId)) { - addField(doc, pages[i].getData(rowId), indexColumns.get(i), Field.Store.NO); + LuceneColumnKeys columns = new LuceneColumnKeys(getIndexColumns().size()); + int i = 0; + for (ColumnPage page : pages) { + if (!page.getNullBits().get(rowId)) { + columns.colValues[i++] = getValue(page, rowId); } } - - // add this document - ramIndexWriter.addDocument(doc); + if (cacheSize > 0) { + addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise); + } else { + addData(columns, rowId, pageId, blockletId, intBuffer, ramIndexWriter, getIndexColumns(), + storeBlockletWise); + } + } + if (cacheSize > 0) { + flushCacheIfPossible(); } - } - private boolean addField(Document doc, Object data, CarbonColumn column, Field.Store store) { + private static boolean addField(Document doc, Object key, String fieldName, Field.Store store) { --- End diff -- This method is always returning true and return value is not required in caller --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189439711 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java --- @@ -242,73 +260,220 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, (int) value)); } - } else if (type == DataTypes.INT) { + } else if (key instanceof Integer) { // int type , use int point to deal with int type - int value = (int) data; - doc.add(new IntPoint(fieldName, value)); + int value = (Integer) key; + doc.add(new IntPoint(fieldName, new int[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.LONG) { + } else if (key instanceof Long) { // long type , use long point to deal with long type - long value = (long) data; - doc.add(new LongPoint(fieldName, value)); + long value = (Long) key; + doc.add(new LongPoint(fieldName, new long[] { value })); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } - } else if (type == DataTypes.FLOAT) { - float value = (float) data; - doc.add(new FloatPoint(fieldName, value)); + } else if (key instanceof Float) { + float value = (Float) key; + doc.add(new FloatPoint(fieldName, new float[] { value })); if (store == Field.Store.YES) { doc.add(new FloatPoint(fieldName, value)); } - } else if (type == DataTypes.DOUBLE) { - double value = (double) data; - doc.add(new DoublePoint(fieldName, value)); + } else if (key instanceof Double) { + double value = (Double) key; + doc.add(new DoublePoint(fieldName, new double[] { value })); if (store == Field.Store.YES) { doc.add(new DoublePoint(fieldName, value)); } + } else if (key instanceof String) { + String strValue = (String) key; + doc.add(new TextField(fieldName, strValue, store)); + } else if (key instanceof Boolean) { + boolean value = (Boolean) key; + IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); + field.setIntValue(value ? 1 : 0); + doc.add(field); + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value ? 1 : 0)); + } + } + return true; + } + + private Object getValue(ColumnPage page, int rowId) { + + //get field type + DataType type = page.getColumnSpec().getSchemaDataType(); + Object value = null; + if (type == DataTypes.BYTE) { + // byte type , use int range to deal with byte, lucene has no byte type + value = page.getByte(rowId); + } else if (type == DataTypes.SHORT) { + // short type , use int range to deal with short type, lucene has no short type + value = page.getShort(rowId); + } else if (type == DataTypes.INT) { + // int type , use int point to deal with int type + value = page.getInt(rowId); + } else if (type == DataTypes.LONG) { + // long type , use long point to deal with long type + value = page.getLong(rowId); + } else if (type == DataTypes.FLOAT) { + value = page.getFloat(rowId); + } else if (type == DataTypes.DOUBLE) { + value = page.getDouble(rowId); } else if (type == DataTypes.STRING) { - byte[] value = (byte[]) data; - String strValue = null; + byte[] bytes = page.getBytes(rowId); try { - strValue = new String(value, 2, value.length - 2, "UTF-8"); + value = new String(bytes, 2, bytes.length - 2, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } - doc.add(new TextField(fieldName, strValue, store)); } else if (type == DataTypes.DATE) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.TIMESTAMP) { throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.BOOLEAN) { - boolean value = (boolean) data; - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue(value ? 1 : 0); - doc.add(field); - if (store == Field.Store.YES) { - doc.add(new StoredField(fieldName, value ? 1 : 0)); - } + value = page.getBoolean(rowId); } else { LOGGER.error("unsupport data type " + type); throw new RuntimeException("unsupported data type " + type); } - return true; + return value; + } + + public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer, + boolean storeBlockletWise) { + Map<Integer, RoaringBitmap> setMap = cache.get(key); + if (setMap == null) { + setMap = new HashMap<>(); + cache.put(key, setMap); + } + int combinKey; + if (!storeBlockletWise) { + intBuffer.clear(); + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + combinKey = intBuffer.getInt(); + } else { + combinKey = pageId; + } + RoaringBitmap bitSet = setMap.get(combinKey); + if (bitSet == null) { + bitSet = new RoaringBitmap(); + setMap.put(combinKey, bitSet); + } + bitSet.add(rowId); + } + + public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId, + ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols, + boolean storeBlockletWise) throws IOException { + + Document document = new Document(); + for (int i = 0; i < key.getColValues().length; i++) { + addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO); + } + intBuffer.clear(); + if (storeBlockletWise) { + // No need to store blocklet id to it. + intBuffer.putShort((short) pageId); + intBuffer.putShort((short) rowId); + intBuffer.rewind(); + document.add(new StoredField(ROWID_NAME, intBuffer.getInt())); + } else { + intBuffer.putShort((short) blockletId); + intBuffer.putShort((short) pageId); + intBuffer.rewind(); + document.add(new StoredField(PAGEID_NAME, intBuffer.getInt())); + document.add(new StoredField(ROWID_NAME, (short) rowId)); + } + indexWriter.addDocument(document); + } + + private void flushCacheIfPossible() throws IOException { + if (cache.size() > cacheSize) { + flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise); + } + } + + public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, --- End diff -- It is required as `LuceneDataMapBuilder` calls it --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r189439715 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java --- @@ -212,69 +228,58 @@ private int getMaxDoc(Expression expression) { LOGGER.error(errorMessage); return null; } - - // execute index search - // initialize to null, else ScoreDoc objects will get accumulated in memory - TopDocs result = null; - try { - result = indexSearcher.search(query, maxDocs); - } catch (IOException e) { - String errorMessage = - String.format("failed to search lucene data, detail is %s", e.getMessage()); - LOGGER.error(errorMessage); - throw new IOException(errorMessage); - } - // temporary data, delete duplicated data // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> - Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>(); - - for (ScoreDoc scoreDoc : result.scoreDocs) { - // get a document - Document doc = indexSearcher.doc(scoreDoc.doc); + Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>(); + + for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) { + IndexSearcher indexSearcher = searcherEntry.getValue(); + // execute index search + // initialize to null, else ScoreDoc objects will get accumulated in memory + TopDocs result = null; + try { + result = indexSearcher.search(query, maxDocs); + } catch (IOException e) { + String errorMessage = + String.format("failed to search lucene data, detail is %s", e.getMessage()); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } - // get all fields - List<IndexableField> fieldsInDoc = doc.getFields(); + ByteBuffer intBuffer = ByteBuffer.allocate(4); - // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>> - String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue(); - Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId); - if (mapPageIds == null) { - mapPageIds = new HashMap<>(); - mapBlocks.put(blockletId, mapPageIds); - } + for (ScoreDoc scoreDoc : result.scoreDocs) { + // get a document + Document doc = indexSearcher.doc(scoreDoc.doc); - // get the page id Map<PageId, Set<RowId>> - Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue(); - Set<Integer> setRowId = mapPageIds.get(pageId.intValue()); - if (setRowId == null) { - setRowId = new HashSet<>(); - mapPageIds.put(pageId.intValue(), setRowId); + // get all fields + List<IndexableField> fieldsInDoc = doc.getFields(); + if (writeCacheSize > 0) { + fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey()); --- End diff -- ok --- |
Free forum by Nabble | Edit this page |