[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

classic Classic list List threaded Threaded
48 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769082
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java ---
    @@ -114,108 +118,36 @@ public void initialize() throws IOException {
         indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
       }
     
    -  private IndexWriter createPageIndexWriter() throws IOException {
    -    // save index data into ram, write into disk after one page finished
    -    RAMDirectory ramDir = new RAMDirectory();
    -    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
    -  }
    -
    -  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
    -
    -    Directory directory = pageIndexWriter.getDirectory();
    -
    -    // close ram writer
    -    pageIndexWriter.close();
    -
    -    // add ram index data into disk
    -    indexWriter.addIndexes(directory);
    -
    -    // delete this ram data
    -    directory.close();
    -  }
    -
    -  @Override
    -  public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException {
    -    if (rowId == 0) {
    -      if (pageIndexWriter != null) {
    -        addPageIndex(pageIndexWriter);
    -      }
    -      pageIndexWriter = createPageIndexWriter();
    -    }
    -
    -    // create a new document
    -    Document doc = new Document();
    -
    -    // add blocklet Id
    -    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
    -    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
    -
    -    // add page id
    -    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
    -    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
    -
    -    // add row id
    -    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
    -    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
    +  @Override public void addRow(int blockletId, int pageId, int rowId, Object[] values)
    +      throws IOException {
     
         // add other fields
    +    LuceneDataMapWriter.LuceneColumnKeys columns =
    +        new LuceneDataMapWriter.LuceneColumnKeys(columnsCount);
         for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
    -      CarbonColumn column = indexColumns.get(colIdx);
    -      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
    +      columns.getColValues()[colIdx] = values[colIdx];
    +    }
    +    if (writeCacheSize > 0) {
    +      addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise);
    +      flushCacheIfCan();
    +    } else {
    +      addData(columns, rowId, pageId, blockletId, intBuffer, indexWriter, indexColumns,
    +          storeBlockletWise);
         }
     
    -    pageIndexWriter.addDocument(doc);
       }
     
    -  private boolean addField(Document doc, String fieldName, DataType type, Object value) {
    -    if (type == DataTypes.STRING) {
    -      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
    -    } else if (type == DataTypes.BYTE) {
    -      // byte type , use int range to deal with byte, lucene has no byte type
    -      IntRangeField field =
    -          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
    -      field.setIntValue((int) value);
    -      doc.add(field);
    -    } else if (type == DataTypes.SHORT) {
    -      // short type , use int range to deal with short type, lucene has no short type
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
    -          new int[] { Short.MAX_VALUE });
    -      field.setShortValue((short) value);
    -      doc.add(field);
    -    } else if (type == DataTypes.INT) {
    -      // int type , use int point to deal with int type
    -      doc.add(new IntPoint(fieldName, (int) value));
    -    } else if (type == DataTypes.LONG) {
    -      // long type , use long point to deal with long type
    -      doc.add(new LongPoint(fieldName, (long) value));
    -    } else if (type == DataTypes.FLOAT) {
    -      doc.add(new FloatPoint(fieldName, (float) value));
    -    } else if (type == DataTypes.DOUBLE) {
    -      doc.add(new DoublePoint(fieldName, (double) value));
    -    } else if (type == DataTypes.DATE) {
    -      // TODO: how to get data value
    -    } else if (type == DataTypes.TIMESTAMP) {
    -      // TODO: how to get
    -    } else if (type == DataTypes.BOOLEAN) {
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    -      field.setIntValue((boolean) value ? 1 : 0);
    -      doc.add(field);
    -    } else {
    -      LOGGER.error("unsupport data type " + type);
    -      throw new RuntimeException("unsupported data type " + type);
    +  private void flushCacheIfCan() throws IOException {
    +    if (cache.size() > writeCacheSize) {
    +      flushCache(cache, indexColumns, indexWriter, storeBlockletWise);
         }
    -    return true;
       }
     
    -  @Override
    -  public void finish() throws IOException {
    -    if (indexWriter != null && pageIndexWriter != null) {
    -      addPageIndex(pageIndexWriter);
    -    }
    +  @Override public void finish() throws IOException {
    --- End diff --
   
    move override to the previous ine


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769148
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -175,52 +205,38 @@ public void onBlockletEnd(int blockletId) throws IOException {
        */
       public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages)
           throws IOException {
    +    // save index data into ram, write into disk after one page finished
    +    int columnsCount = pages.length;
    +    if (columnsCount <= 0) {
    +      LOGGER.warn("empty data");
    --- End diff --
   
    Log is too simple for user to find problems in real env.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769085
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java ---
    @@ -114,108 +118,36 @@ public void initialize() throws IOException {
         indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
       }
     
    -  private IndexWriter createPageIndexWriter() throws IOException {
    -    // save index data into ram, write into disk after one page finished
    -    RAMDirectory ramDir = new RAMDirectory();
    -    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
    -  }
    -
    -  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
    -
    -    Directory directory = pageIndexWriter.getDirectory();
    -
    -    // close ram writer
    -    pageIndexWriter.close();
    -
    -    // add ram index data into disk
    -    indexWriter.addIndexes(directory);
    -
    -    // delete this ram data
    -    directory.close();
    -  }
    -
    -  @Override
    -  public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException {
    -    if (rowId == 0) {
    -      if (pageIndexWriter != null) {
    -        addPageIndex(pageIndexWriter);
    -      }
    -      pageIndexWriter = createPageIndexWriter();
    -    }
    -
    -    // create a new document
    -    Document doc = new Document();
    -
    -    // add blocklet Id
    -    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
    -    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
    -
    -    // add page id
    -    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
    -    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
    -
    -    // add row id
    -    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
    -    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
    +  @Override public void addRow(int blockletId, int pageId, int rowId, Object[] values)
    +      throws IOException {
     
         // add other fields
    +    LuceneDataMapWriter.LuceneColumnKeys columns =
    +        new LuceneDataMapWriter.LuceneColumnKeys(columnsCount);
         for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
    -      CarbonColumn column = indexColumns.get(colIdx);
    -      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
    +      columns.getColValues()[colIdx] = values[colIdx];
    +    }
    +    if (writeCacheSize > 0) {
    +      addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise);
    +      flushCacheIfCan();
    +    } else {
    +      addData(columns, rowId, pageId, blockletId, intBuffer, indexWriter, indexColumns,
    +          storeBlockletWise);
         }
     
    -    pageIndexWriter.addDocument(doc);
       }
     
    -  private boolean addField(Document doc, String fieldName, DataType type, Object value) {
    -    if (type == DataTypes.STRING) {
    -      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
    -    } else if (type == DataTypes.BYTE) {
    -      // byte type , use int range to deal with byte, lucene has no byte type
    -      IntRangeField field =
    -          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
    -      field.setIntValue((int) value);
    -      doc.add(field);
    -    } else if (type == DataTypes.SHORT) {
    -      // short type , use int range to deal with short type, lucene has no short type
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
    -          new int[] { Short.MAX_VALUE });
    -      field.setShortValue((short) value);
    -      doc.add(field);
    -    } else if (type == DataTypes.INT) {
    -      // int type , use int point to deal with int type
    -      doc.add(new IntPoint(fieldName, (int) value));
    -    } else if (type == DataTypes.LONG) {
    -      // long type , use long point to deal with long type
    -      doc.add(new LongPoint(fieldName, (long) value));
    -    } else if (type == DataTypes.FLOAT) {
    -      doc.add(new FloatPoint(fieldName, (float) value));
    -    } else if (type == DataTypes.DOUBLE) {
    -      doc.add(new DoublePoint(fieldName, (double) value));
    -    } else if (type == DataTypes.DATE) {
    -      // TODO: how to get data value
    -    } else if (type == DataTypes.TIMESTAMP) {
    -      // TODO: how to get
    -    } else if (type == DataTypes.BOOLEAN) {
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    -      field.setIntValue((boolean) value ? 1 : 0);
    -      doc.add(field);
    -    } else {
    -      LOGGER.error("unsupport data type " + type);
    -      throw new RuntimeException("unsupported data type " + type);
    +  private void flushCacheIfCan() throws IOException {
    +    if (cache.size() > writeCacheSize) {
    +      flushCache(cache, indexColumns, indexWriter, storeBlockletWise);
         }
    -    return true;
       }
     
    -  @Override
    -  public void finish() throws IOException {
    -    if (indexWriter != null && pageIndexWriter != null) {
    -      addPageIndex(pageIndexWriter);
    -    }
    +  @Override public void finish() throws IOException {
    +    flushCache(cache, indexColumns, indexWriter, storeBlockletWise);
       }
     
    -  @Override
    -  public void close() throws IOException {
    +  @Override public void close() throws IOException {
    --- End diff --
   
    move override to the previous ine


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769052
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java ---
    @@ -66,20 +62,28 @@
     
       private IndexWriter indexWriter = null;
     
    -  private IndexWriter pageIndexWriter = null;
    -
       private Analyzer analyzer = null;
     
    -  LuceneDataMapRefresher(String tablePath, String dataMapName,
    -      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
    -    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
    -        tablePath, segment.getSegmentNo(), dataMapName, shardName);
    +  private int writeCacheSize;
    +
    +  private Map<LuceneDataMapWriter.LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache =
    +      new HashMap<>();
    +
    +  private ByteBuffer intBuffer = ByteBuffer.allocate(4);
    +
    +  private boolean storeBlockletWise;
    +
    +  LuceneDataMapRefresher(String tablePath, String dataMapName, Segment segment, String shardName,
    +      List<CarbonColumn> indexColumns, int writeCacheSize, boolean storeBlockletWise) {
    +    this.dataMapPath = CarbonTablePath
    +        .getDataMapStorePathOnShardName(tablePath, segment.getSegmentNo(), dataMapName, shardName);
         this.indexColumns = indexColumns;
         this.columnsCount = indexColumns.size();
    +    this.writeCacheSize = writeCacheSize;
    +    this.storeBlockletWise = storeBlockletWise;
       }
     
    -  @Override
    -  public void initialize() throws IOException {
    +  @Override public void initialize() throws IOException {
    --- End diff --
   
    Move override to the previous line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769190
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -242,74 +258,216 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, (int) value));
           }
    -    } else if (type == DataTypes.INT) {
    +    } else if (key instanceof Integer) {
           // int type , use int point to deal with int type
    -      int value = (int) data;
    -      doc.add(new IntPoint(fieldName, value));
    +      int value = (Integer) key;
    +      doc.add(new IntPoint(fieldName, new int[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.LONG) {
    +    } else if (key instanceof Long) {
           // long type , use long point to deal with long type
    -      long value = (long) data;
    -      doc.add(new LongPoint(fieldName, value));
    +      long value = (Long) key;
    +      doc.add(new LongPoint(fieldName, new long[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.FLOAT) {
    -      float value = (float) data;
    -      doc.add(new FloatPoint(fieldName, value));
    +    } else if (key instanceof Float) {
    +      float value = (Float) key;
    +      doc.add(new FloatPoint(fieldName, new float[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new FloatPoint(fieldName, value));
           }
    -    } else if (type == DataTypes.DOUBLE) {
    -      double value = (double) data;
    -      doc.add(new DoublePoint(fieldName, value));
    +    } else if (key instanceof Double) {
    +      double value = (Double) key;
    +      doc.add(new DoublePoint(fieldName, new double[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new DoublePoint(fieldName, value));
           }
    +    } else if (key instanceof String) {
    +      String strValue = (String) key;
    +      doc.add(new TextField(fieldName, strValue, store));
    +    } else if (key instanceof Boolean) {
    +      boolean value = (Boolean) key;
    +      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    +      field.setIntValue(value ? 1 : 0);
    +      doc.add(field);
    +      if (store == Field.Store.YES) {
    +        doc.add(new StoredField(fieldName, value ? 1 : 0));
    +      }
    +    }
    +    return true;
    +  }
    +
    +  private Object getValue(ColumnPage page, int rowId) {
    +
    +    //get field type
    +    DataType type = page.getColumnSpec().getSchemaDataType();
    +    Object value = null;
    +    if (type == DataTypes.BYTE) {
    +      // byte type , use int range to deal with byte, lucene has no byte type
    +      value = page.getByte(rowId);
    +    } else if (type == DataTypes.SHORT) {
    +      // short type , use int range to deal with short type, lucene has no short type
    +      value = page.getShort(rowId);
    +    } else if (type == DataTypes.INT) {
    +      // int type , use int point to deal with int type
    +      value = page.getInt(rowId);
    +    } else if (type == DataTypes.LONG) {
    +      // long type , use long point to deal with long type
    +      value = page.getLong(rowId);
    +    } else if (type == DataTypes.FLOAT) {
    +      value = page.getFloat(rowId);
    +    } else if (type == DataTypes.DOUBLE) {
    +      value = page.getDouble(rowId);
         } else if (type == DataTypes.STRING) {
    -      byte[] value = (byte[]) data;
    +      byte[] bytes = page.getBytes(rowId);
           // TODO: how to get string value
    -      String strValue = null;
           try {
    -        strValue = new String(value, 2, value.length - 2, "UTF-8");
    +        value = new String(bytes, 2, bytes.length - 2, "UTF-8");
           } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
           }
    -      doc.add(new TextField(fieldName, strValue, store));
         } else if (type == DataTypes.DATE) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.TIMESTAMP) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.BOOLEAN) {
    -      boolean value = (boolean) data;
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    -      field.setIntValue(value ? 1 : 0);
    -      doc.add(field);
    -      if (store == Field.Store.YES) {
    -        doc.add(new StoredField(fieldName, value ? 1 : 0));
    -      }
    +      value = page.getBoolean(rowId);
         } else {
           LOGGER.error("unsupport data type " + type);
           throw new RuntimeException("unsupported data type " + type);
         }
    -    return true;
    +    return value;
    +  }
    +
    +  public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer,
    +      boolean storeBlockletWise) {
    +    Map<Integer, RoaringBitmap> setMap = cache.get(key);
    +    if (setMap == null) {
    +      setMap = new HashMap<>();
    +      cache.put(key, setMap);
    +    }
    +    int combinKey;
    +    if (!storeBlockletWise) {
    +      intBuffer.clear();
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      combinKey = intBuffer.getInt();
    +    } else {
    +      combinKey = pageId;
    +    }
    +    RoaringBitmap bitSet = setMap.get(combinKey);
    +    if (bitSet == null) {
    +      bitSet = new RoaringBitmap();
    +      setMap.put(combinKey, bitSet);
    +    }
    +    bitSet.add(rowId);
    +  }
    +
    +  public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols,
    +      boolean storeBlockletWise) throws IOException {
    +
    +    Document document = new Document();
    +    for (int i = 0; i < key.getColValues().length; i++) {
    +      addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
    +    }
    +    intBuffer.clear();
    +    if (storeBlockletWise) {
    +      // No need to store blocklet id to it.
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.putShort((short) rowId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(ROWID_NAME, intBuffer.getInt()));
    +    } else {
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(PAGEID_NAME, intBuffer.getInt()));
    +      document.add(new StoredField(ROWID_NAME, (short) rowId));
    +    }
    +    indexWriter.addDocument(document);
    +  }
    +
    +  private void flushCacheIfCan() throws IOException {
    --- End diff --
   
    better to name it `flushCacheIfPossible`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769065
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java ---
    @@ -114,108 +118,36 @@ public void initialize() throws IOException {
         indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
       }
     
    -  private IndexWriter createPageIndexWriter() throws IOException {
    -    // save index data into ram, write into disk after one page finished
    -    RAMDirectory ramDir = new RAMDirectory();
    -    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
    -  }
    -
    -  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
    -
    -    Directory directory = pageIndexWriter.getDirectory();
    -
    -    // close ram writer
    -    pageIndexWriter.close();
    -
    -    // add ram index data into disk
    -    indexWriter.addIndexes(directory);
    -
    -    // delete this ram data
    -    directory.close();
    -  }
    -
    -  @Override
    -  public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException {
    -    if (rowId == 0) {
    -      if (pageIndexWriter != null) {
    -        addPageIndex(pageIndexWriter);
    -      }
    -      pageIndexWriter = createPageIndexWriter();
    -    }
    -
    -    // create a new document
    -    Document doc = new Document();
    -
    -    // add blocklet Id
    -    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
    -    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
    -
    -    // add page id
    -    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
    -    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
    -
    -    // add row id
    -    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
    -    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
    +  @Override public void addRow(int blockletId, int pageId, int rowId, Object[] values)
    --- End diff --
   
    move override to the previous ine


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r187769203
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -242,74 +258,216 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, (int) value));
           }
    -    } else if (type == DataTypes.INT) {
    +    } else if (key instanceof Integer) {
           // int type , use int point to deal with int type
    -      int value = (int) data;
    -      doc.add(new IntPoint(fieldName, value));
    +      int value = (Integer) key;
    +      doc.add(new IntPoint(fieldName, new int[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.LONG) {
    +    } else if (key instanceof Long) {
           // long type , use long point to deal with long type
    -      long value = (long) data;
    -      doc.add(new LongPoint(fieldName, value));
    +      long value = (Long) key;
    +      doc.add(new LongPoint(fieldName, new long[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.FLOAT) {
    -      float value = (float) data;
    -      doc.add(new FloatPoint(fieldName, value));
    +    } else if (key instanceof Float) {
    +      float value = (Float) key;
    +      doc.add(new FloatPoint(fieldName, new float[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new FloatPoint(fieldName, value));
           }
    -    } else if (type == DataTypes.DOUBLE) {
    -      double value = (double) data;
    -      doc.add(new DoublePoint(fieldName, value));
    +    } else if (key instanceof Double) {
    +      double value = (Double) key;
    +      doc.add(new DoublePoint(fieldName, new double[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new DoublePoint(fieldName, value));
           }
    +    } else if (key instanceof String) {
    +      String strValue = (String) key;
    +      doc.add(new TextField(fieldName, strValue, store));
    +    } else if (key instanceof Boolean) {
    +      boolean value = (Boolean) key;
    +      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    +      field.setIntValue(value ? 1 : 0);
    +      doc.add(field);
    +      if (store == Field.Store.YES) {
    +        doc.add(new StoredField(fieldName, value ? 1 : 0));
    +      }
    +    }
    +    return true;
    +  }
    +
    +  private Object getValue(ColumnPage page, int rowId) {
    +
    +    //get field type
    +    DataType type = page.getColumnSpec().getSchemaDataType();
    +    Object value = null;
    +    if (type == DataTypes.BYTE) {
    +      // byte type , use int range to deal with byte, lucene has no byte type
    +      value = page.getByte(rowId);
    +    } else if (type == DataTypes.SHORT) {
    +      // short type , use int range to deal with short type, lucene has no short type
    +      value = page.getShort(rowId);
    +    } else if (type == DataTypes.INT) {
    +      // int type , use int point to deal with int type
    +      value = page.getInt(rowId);
    +    } else if (type == DataTypes.LONG) {
    +      // long type , use long point to deal with long type
    +      value = page.getLong(rowId);
    +    } else if (type == DataTypes.FLOAT) {
    +      value = page.getFloat(rowId);
    +    } else if (type == DataTypes.DOUBLE) {
    +      value = page.getDouble(rowId);
         } else if (type == DataTypes.STRING) {
    -      byte[] value = (byte[]) data;
    +      byte[] bytes = page.getBytes(rowId);
           // TODO: how to get string value
    -      String strValue = null;
           try {
    -        strValue = new String(value, 2, value.length - 2, "UTF-8");
    +        value = new String(bytes, 2, bytes.length - 2, "UTF-8");
           } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
           }
    -      doc.add(new TextField(fieldName, strValue, store));
         } else if (type == DataTypes.DATE) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.TIMESTAMP) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.BOOLEAN) {
    -      boolean value = (boolean) data;
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    -      field.setIntValue(value ? 1 : 0);
    -      doc.add(field);
    -      if (store == Field.Store.YES) {
    -        doc.add(new StoredField(fieldName, value ? 1 : 0));
    -      }
    +      value = page.getBoolean(rowId);
         } else {
           LOGGER.error("unsupport data type " + type);
           throw new RuntimeException("unsupported data type " + type);
         }
    -    return true;
    +    return value;
    +  }
    +
    +  public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer,
    +      boolean storeBlockletWise) {
    +    Map<Integer, RoaringBitmap> setMap = cache.get(key);
    +    if (setMap == null) {
    +      setMap = new HashMap<>();
    +      cache.put(key, setMap);
    +    }
    +    int combinKey;
    +    if (!storeBlockletWise) {
    +      intBuffer.clear();
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      combinKey = intBuffer.getInt();
    +    } else {
    +      combinKey = pageId;
    +    }
    +    RoaringBitmap bitSet = setMap.get(combinKey);
    +    if (bitSet == null) {
    +      bitSet = new RoaringBitmap();
    +      setMap.put(combinKey, bitSet);
    +    }
    +    bitSet.add(rowId);
    +  }
    +
    +  public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols,
    +      boolean storeBlockletWise) throws IOException {
    +
    +    Document document = new Document();
    +    for (int i = 0; i < key.getColValues().length; i++) {
    +      addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
    +    }
    +    intBuffer.clear();
    +    if (storeBlockletWise) {
    +      // No need to store blocklet id to it.
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.putShort((short) rowId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(ROWID_NAME, intBuffer.getInt()));
    +    } else {
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(PAGEID_NAME, intBuffer.getInt()));
    +      document.add(new StoredField(ROWID_NAME, (short) rowId));
    +    }
    +    indexWriter.addDocument(document);
    +  }
    +
    +  private void flushCacheIfCan() throws IOException {
    +    if (cache.size() > cacheSize) {
    +      flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
    +    }
    +  }
    +
    +  public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache,
    +      List<CarbonColumn> indexCols, IndexWriter indexWriter, boolean storeBlockletWise)
    +      throws IOException {
    +    for (Map.Entry<LuceneColumnKeys, Map<Integer, RoaringBitmap>> entry : cache.entrySet()) {
    +      Document document = new Document();
    +      LuceneColumnKeys key = entry.getKey();
    +      for (int i = 0; i < key.getColValues().length; i++) {
    +        addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
    +      }
    +      Map<Integer, RoaringBitmap> value = entry.getValue();
    +      int count = 0;
    +      for (Map.Entry<Integer, RoaringBitmap> pageData : value.entrySet()) {
    +        RoaringBitmap bitMap = pageData.getValue();
    +        int cardinality = bitMap.getCardinality();
    +        // Each row is short and pageid is stored in int
    +        ByteBuffer byteBuffer = ByteBuffer.allocate(cardinality * 2 + 4);
    +        if (!storeBlockletWise) {
    +          byteBuffer.putInt(pageData.getKey());
    +        } else {
    +          byteBuffer.putShort(pageData.getKey().shortValue());
    +        }
    +        IntIterator intIterator = bitMap.getIntIterator();
    +        while (intIterator.hasNext()) {
    +          byteBuffer.putShort((short) intIterator.next());
    +        }
    +        document.add(new StoredField(PAGEID_NAME + count, byteBuffer.array()));
    +        count++;
    +      }
    +      indexWriter.addDocument(document);
    +    }
    +    cache.clear();
       }
     
       /**
        * This is called during closing of writer.So after this call no more data will be sent to this
        * class.
        */
       public void finish() throws IOException {
    +    flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
         // finished a file , close this index writer
         if (indexWriter != null) {
           indexWriter.close();
         }
       }
     
    +  public static class LuceneColumnKeys {
    +
    +    private Object[] colValues;
    +
    +    public LuceneColumnKeys(int size) {
    +      colValues = new Object[size];
    +    }
    +
    +    public Object[] getColValues() {
    +      return colValues;
    +    }
    +
    +    @Override public boolean equals(Object o) {
    --- End diff --
   
    move override to previous line, besides there are others in this class.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2275
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4913/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2275
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4722/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2275: [WIP] Fix lucene datasize and performance

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2275
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5876/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2275: [CARBONDATA-2494] Fix lucene datasize and performanc...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2275
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4987/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2275: [CARBONDATA-2494] Fix lucene datasize and performanc...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2275
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5961/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2275: [CARBONDATA-2494] Fix lucene datasize and performanc...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2275
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4803/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189419746
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java ---
    @@ -212,69 +228,58 @@ private int getMaxDoc(Expression expression) {
           LOGGER.error(errorMessage);
           return null;
         }
    -
    -    // execute index search
    -    // initialize to null, else ScoreDoc objects will get accumulated in memory
    -    TopDocs result = null;
    -    try {
    -      result = indexSearcher.search(query, maxDocs);
    -    } catch (IOException e) {
    -      String errorMessage =
    -          String.format("failed to search lucene data, detail is %s", e.getMessage());
    -      LOGGER.error(errorMessage);
    -      throw new IOException(errorMessage);
    -    }
    -
         // temporary data, delete duplicated data
         // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
    -    Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>();
    -
    -    for (ScoreDoc scoreDoc : result.scoreDocs) {
    -      // get a document
    -      Document doc = indexSearcher.doc(scoreDoc.doc);
    +    Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>();
    +
    +    for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) {
    +      IndexSearcher indexSearcher = searcherEntry.getValue();
    +      // execute index search
    +      // initialize to null, else ScoreDoc objects will get accumulated in memory
    +      TopDocs result = null;
    +      try {
    +        result = indexSearcher.search(query, maxDocs);
    +      } catch (IOException e) {
    +        String errorMessage =
    +            String.format("failed to search lucene data, detail is %s", e.getMessage());
    +        LOGGER.error(errorMessage);
    +        throw new IOException(errorMessage);
    +      }
     
    -      // get all fields
    -      List<IndexableField> fieldsInDoc = doc.getFields();
    +      ByteBuffer intBuffer = ByteBuffer.allocate(4);
     
    -      // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>>
    -      String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue();
    -      Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId);
    -      if (mapPageIds == null) {
    -        mapPageIds = new HashMap<>();
    -        mapBlocks.put(blockletId, mapPageIds);
    -      }
    +      for (ScoreDoc scoreDoc : result.scoreDocs) {
    +        // get a document
    +        Document doc = indexSearcher.doc(scoreDoc.doc);
     
    -      // get the page id Map<PageId, Set<RowId>>
    -      Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue();
    -      Set<Integer> setRowId = mapPageIds.get(pageId.intValue());
    -      if (setRowId == null) {
    -        setRowId = new HashSet<>();
    -        mapPageIds.put(pageId.intValue(), setRowId);
    +        // get all fields
    +        List<IndexableField> fieldsInDoc = doc.getFields();
    +        if (writeCacheSize > 0) {
    +          fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
    --- End diff --
   
    please add comment here to describe fillMap and fillMapDirect


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189419824
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -242,73 +260,220 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, (int) value));
           }
    -    } else if (type == DataTypes.INT) {
    +    } else if (key instanceof Integer) {
           // int type , use int point to deal with int type
    -      int value = (int) data;
    -      doc.add(new IntPoint(fieldName, value));
    +      int value = (Integer) key;
    +      doc.add(new IntPoint(fieldName, new int[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.LONG) {
    +    } else if (key instanceof Long) {
           // long type , use long point to deal with long type
    -      long value = (long) data;
    -      doc.add(new LongPoint(fieldName, value));
    +      long value = (Long) key;
    +      doc.add(new LongPoint(fieldName, new long[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.FLOAT) {
    -      float value = (float) data;
    -      doc.add(new FloatPoint(fieldName, value));
    +    } else if (key instanceof Float) {
    +      float value = (Float) key;
    +      doc.add(new FloatPoint(fieldName, new float[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new FloatPoint(fieldName, value));
           }
    -    } else if (type == DataTypes.DOUBLE) {
    -      double value = (double) data;
    -      doc.add(new DoublePoint(fieldName, value));
    +    } else if (key instanceof Double) {
    +      double value = (Double) key;
    +      doc.add(new DoublePoint(fieldName, new double[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new DoublePoint(fieldName, value));
           }
    +    } else if (key instanceof String) {
    +      String strValue = (String) key;
    +      doc.add(new TextField(fieldName, strValue, store));
    +    } else if (key instanceof Boolean) {
    +      boolean value = (Boolean) key;
    +      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    +      field.setIntValue(value ? 1 : 0);
    +      doc.add(field);
    +      if (store == Field.Store.YES) {
    +        doc.add(new StoredField(fieldName, value ? 1 : 0));
    +      }
    +    }
    +    return true;
    +  }
    +
    +  private Object getValue(ColumnPage page, int rowId) {
    +
    +    //get field type
    +    DataType type = page.getColumnSpec().getSchemaDataType();
    +    Object value = null;
    +    if (type == DataTypes.BYTE) {
    +      // byte type , use int range to deal with byte, lucene has no byte type
    +      value = page.getByte(rowId);
    +    } else if (type == DataTypes.SHORT) {
    +      // short type , use int range to deal with short type, lucene has no short type
    +      value = page.getShort(rowId);
    +    } else if (type == DataTypes.INT) {
    +      // int type , use int point to deal with int type
    +      value = page.getInt(rowId);
    +    } else if (type == DataTypes.LONG) {
    +      // long type , use long point to deal with long type
    +      value = page.getLong(rowId);
    +    } else if (type == DataTypes.FLOAT) {
    +      value = page.getFloat(rowId);
    +    } else if (type == DataTypes.DOUBLE) {
    +      value = page.getDouble(rowId);
         } else if (type == DataTypes.STRING) {
    -      byte[] value = (byte[]) data;
    -      String strValue = null;
    +      byte[] bytes = page.getBytes(rowId);
           try {
    -        strValue = new String(value, 2, value.length - 2, "UTF-8");
    +        value = new String(bytes, 2, bytes.length - 2, "UTF-8");
           } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
           }
    -      doc.add(new TextField(fieldName, strValue, store));
         } else if (type == DataTypes.DATE) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.TIMESTAMP) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.BOOLEAN) {
    -      boolean value = (boolean) data;
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    -      field.setIntValue(value ? 1 : 0);
    -      doc.add(field);
    -      if (store == Field.Store.YES) {
    -        doc.add(new StoredField(fieldName, value ? 1 : 0));
    -      }
    +      value = page.getBoolean(rowId);
         } else {
           LOGGER.error("unsupport data type " + type);
           throw new RuntimeException("unsupported data type " + type);
         }
    -    return true;
    +    return value;
    +  }
    +
    +  public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer,
    +      boolean storeBlockletWise) {
    +    Map<Integer, RoaringBitmap> setMap = cache.get(key);
    +    if (setMap == null) {
    +      setMap = new HashMap<>();
    +      cache.put(key, setMap);
    +    }
    +    int combinKey;
    +    if (!storeBlockletWise) {
    +      intBuffer.clear();
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      combinKey = intBuffer.getInt();
    +    } else {
    +      combinKey = pageId;
    +    }
    +    RoaringBitmap bitSet = setMap.get(combinKey);
    +    if (bitSet == null) {
    +      bitSet = new RoaringBitmap();
    +      setMap.put(combinKey, bitSet);
    +    }
    +    bitSet.add(rowId);
    +  }
    +
    +  public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols,
    +      boolean storeBlockletWise) throws IOException {
    +
    +    Document document = new Document();
    +    for (int i = 0; i < key.getColValues().length; i++) {
    +      addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
    +    }
    +    intBuffer.clear();
    +    if (storeBlockletWise) {
    +      // No need to store blocklet id to it.
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.putShort((short) rowId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(ROWID_NAME, intBuffer.getInt()));
    +    } else {
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(PAGEID_NAME, intBuffer.getInt()));
    +      document.add(new StoredField(ROWID_NAME, (short) rowId));
    +    }
    +    indexWriter.addDocument(document);
    +  }
    +
    +  private void flushCacheIfPossible() throws IOException {
    +    if (cache.size() > cacheSize) {
    +      flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
    +    }
    +  }
    +
    +  public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache,
    --- End diff --
   
    It is not required to be public


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189419987
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -84,20 +91,61 @@
     
       public static final String ROWID_NAME = "rowId";
     
    +  private Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache = new HashMap<>();
    +
    +  private int cacheSize;
    +
    +  private ByteBuffer intBuffer = ByteBuffer.allocate(4);
    +
    +  private boolean storeBlockletWise;
    +
       LuceneDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
    -      Segment segment, String shardName, boolean isFineGrain) {
    +      Segment segment, String shardName, boolean isFineGrain, int flushSize,
    +      boolean storeBlockletWise) {
         super(tablePath, dataMapName, indexColumns, segment, shardName);
         this.isFineGrain = isFineGrain;
    --- End diff --
   
    Seems isFineGrain, indexShardName, BLOCKLETID_NAME is not used


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189420060
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---
    @@ -107,13 +138,39 @@ public LuceneDataMapFactoryBase(CarbonTable carbonTable, DataMapSchema dataMapSc
         // optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
         // optimizedOperations.add(ExpressionType.NOT);
         optimizedOperations.add(ExpressionType.TEXT_MATCH);
    -    this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations);
    -
    +    this.dataMapMeta = new DataMapMeta(indexedCarbonColumns, optimizedOperations);
         // get analyzer
         // TODO: how to get analyzer ?
         analyzer = new StandardAnalyzer();
       }
     
    +  public static int validateAndGetWriteCacheSize(DataMapSchema schema) {
    +    String cacheStr = schema.getProperties().get(FLUSH_CACHE);
    +    if (cacheStr == null) {
    +      cacheStr = FLUSH_CACHE_DEFAULT_SIZE;
    +    }
    +    int cacheSize;
    +    try {
    +      cacheSize = Integer.parseInt(cacheStr);
    +    } catch (NumberFormatException e) {
    +      cacheSize = -1;
    +    }
    +    return cacheSize;
    +  }
    +
    +  public static boolean validateAndGetStoreBlockletWise(DataMapSchema schema) {
    +    String splitBlockletStr = schema.getProperties().get(SPLIT_BLOCKLET);
    +    if (splitBlockletStr == null) {
    +      splitBlockletStr = SPLIT_BLOCKLET_DEFAULT;
    +    }
    +    boolean splitBlockletWise;
    +    try {
    +      splitBlockletWise = Boolean.parseBoolean(splitBlockletStr);
    +    } catch (NumberFormatException e) {
    +      splitBlockletWise = false;
    --- End diff --
   
    I think it is better to make default as true, to make lucene index smaller


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189420105
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -175,52 +206,39 @@ public void onBlockletEnd(int blockletId) throws IOException {
        */
       public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages)
           throws IOException {
    +    // save index data into ram, write into disk after one page finished
    +    int columnsCount = pages.length;
    +    if (columnsCount <= 0) {
    +      LOGGER.warn("No data in the page " + pageId + "with blockletid " + blockletId
    +          + " to write lucene datamap");
    +      return;
    +    }
         for (int rowId = 0; rowId < pageSize; rowId++) {
    -      // create a new document
    -      Document doc = new Document();
    -      // add blocklet Id
    -      doc.add(new IntPoint(BLOCKLETID_NAME, blockletId));
    -      doc.add(new StoredField(BLOCKLETID_NAME, blockletId));
    -      //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId));
    -
    -      // add page id and row id in Fine Grain data map
    -      if (isFineGrain) {
    -        // add page Id
    -        doc.add(new IntPoint(PAGEID_NAME, pageId));
    -        doc.add(new StoredField(PAGEID_NAME, pageId));
    -        //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId));
    -
    -        // add row id
    -        doc.add(new IntPoint(ROWID_NAME, rowId));
    -        doc.add(new StoredField(ROWID_NAME, rowId));
    -        //doc.add(new NumericDocValuesField(ROWID_NAME,rowId));
    -      }
    -
           // add indexed columns value into the document
    -      List<CarbonColumn> indexColumns = getIndexColumns();
    -      for (int i = 0; i < pages.length; i++) {
    -        // add to lucene only if value is not null
    -        if (!pages[i].getNullBits().get(rowId)) {
    -          addField(doc, pages[i].getData(rowId), indexColumns.get(i), Field.Store.NO);
    +      LuceneColumnKeys columns = new LuceneColumnKeys(getIndexColumns().size());
    +      int i = 0;
    +      for (ColumnPage page : pages) {
    +        if (!page.getNullBits().get(rowId)) {
    +          columns.colValues[i++] = getValue(page, rowId);
             }
           }
    -
    -      // add this document
    -      ramIndexWriter.addDocument(doc);
    +      if (cacheSize > 0) {
    +        addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise);
    +      } else {
    +        addData(columns, rowId, pageId, blockletId, intBuffer, ramIndexWriter, getIndexColumns(),
    +            storeBlockletWise);
    +      }
    +    }
    +    if (cacheSize > 0) {
    +      flushCacheIfPossible();
         }
    -
       }
     
    -  private boolean addField(Document doc, Object data, CarbonColumn column, Field.Store store) {
    +  private static boolean addField(Document doc, Object key, String fieldName, Field.Store store) {
    --- End diff --
   
    This method is always returning true and return value is not required in caller


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189439711
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---
    @@ -242,73 +260,220 @@ private boolean addField(Document doc, Object data, CarbonColumn column, Field.S
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, (int) value));
           }
    -    } else if (type == DataTypes.INT) {
    +    } else if (key instanceof Integer) {
           // int type , use int point to deal with int type
    -      int value = (int) data;
    -      doc.add(new IntPoint(fieldName, value));
    +      int value = (Integer) key;
    +      doc.add(new IntPoint(fieldName, new int[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.LONG) {
    +    } else if (key instanceof Long) {
           // long type , use long point to deal with long type
    -      long value = (long) data;
    -      doc.add(new LongPoint(fieldName, value));
    +      long value = (Long) key;
    +      doc.add(new LongPoint(fieldName, new long[] { value }));
     
           // if need store it , add StoredField
           if (store == Field.Store.YES) {
             doc.add(new StoredField(fieldName, value));
           }
    -    } else if (type == DataTypes.FLOAT) {
    -      float value = (float) data;
    -      doc.add(new FloatPoint(fieldName, value));
    +    } else if (key instanceof Float) {
    +      float value = (Float) key;
    +      doc.add(new FloatPoint(fieldName, new float[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new FloatPoint(fieldName, value));
           }
    -    } else if (type == DataTypes.DOUBLE) {
    -      double value = (double) data;
    -      doc.add(new DoublePoint(fieldName, value));
    +    } else if (key instanceof Double) {
    +      double value = (Double) key;
    +      doc.add(new DoublePoint(fieldName, new double[] { value }));
           if (store == Field.Store.YES) {
             doc.add(new DoublePoint(fieldName, value));
           }
    +    } else if (key instanceof String) {
    +      String strValue = (String) key;
    +      doc.add(new TextField(fieldName, strValue, store));
    +    } else if (key instanceof Boolean) {
    +      boolean value = (Boolean) key;
    +      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    +      field.setIntValue(value ? 1 : 0);
    +      doc.add(field);
    +      if (store == Field.Store.YES) {
    +        doc.add(new StoredField(fieldName, value ? 1 : 0));
    +      }
    +    }
    +    return true;
    +  }
    +
    +  private Object getValue(ColumnPage page, int rowId) {
    +
    +    //get field type
    +    DataType type = page.getColumnSpec().getSchemaDataType();
    +    Object value = null;
    +    if (type == DataTypes.BYTE) {
    +      // byte type , use int range to deal with byte, lucene has no byte type
    +      value = page.getByte(rowId);
    +    } else if (type == DataTypes.SHORT) {
    +      // short type , use int range to deal with short type, lucene has no short type
    +      value = page.getShort(rowId);
    +    } else if (type == DataTypes.INT) {
    +      // int type , use int point to deal with int type
    +      value = page.getInt(rowId);
    +    } else if (type == DataTypes.LONG) {
    +      // long type , use long point to deal with long type
    +      value = page.getLong(rowId);
    +    } else if (type == DataTypes.FLOAT) {
    +      value = page.getFloat(rowId);
    +    } else if (type == DataTypes.DOUBLE) {
    +      value = page.getDouble(rowId);
         } else if (type == DataTypes.STRING) {
    -      byte[] value = (byte[]) data;
    -      String strValue = null;
    +      byte[] bytes = page.getBytes(rowId);
           try {
    -        strValue = new String(value, 2, value.length - 2, "UTF-8");
    +        value = new String(bytes, 2, bytes.length - 2, "UTF-8");
           } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
           }
    -      doc.add(new TextField(fieldName, strValue, store));
         } else if (type == DataTypes.DATE) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.TIMESTAMP) {
           throw new RuntimeException("unsupported data type " + type);
         } else if (type == DataTypes.BOOLEAN) {
    -      boolean value = (boolean) data;
    -      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
    -      field.setIntValue(value ? 1 : 0);
    -      doc.add(field);
    -      if (store == Field.Store.YES) {
    -        doc.add(new StoredField(fieldName, value ? 1 : 0));
    -      }
    +      value = page.getBoolean(rowId);
         } else {
           LOGGER.error("unsupport data type " + type);
           throw new RuntimeException("unsupported data type " + type);
         }
    -    return true;
    +    return value;
    +  }
    +
    +  public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer,
    +      boolean storeBlockletWise) {
    +    Map<Integer, RoaringBitmap> setMap = cache.get(key);
    +    if (setMap == null) {
    +      setMap = new HashMap<>();
    +      cache.put(key, setMap);
    +    }
    +    int combinKey;
    +    if (!storeBlockletWise) {
    +      intBuffer.clear();
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      combinKey = intBuffer.getInt();
    +    } else {
    +      combinKey = pageId;
    +    }
    +    RoaringBitmap bitSet = setMap.get(combinKey);
    +    if (bitSet == null) {
    +      bitSet = new RoaringBitmap();
    +      setMap.put(combinKey, bitSet);
    +    }
    +    bitSet.add(rowId);
    +  }
    +
    +  public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
    +      ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols,
    +      boolean storeBlockletWise) throws IOException {
    +
    +    Document document = new Document();
    +    for (int i = 0; i < key.getColValues().length; i++) {
    +      addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
    +    }
    +    intBuffer.clear();
    +    if (storeBlockletWise) {
    +      // No need to store blocklet id to it.
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.putShort((short) rowId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(ROWID_NAME, intBuffer.getInt()));
    +    } else {
    +      intBuffer.putShort((short) blockletId);
    +      intBuffer.putShort((short) pageId);
    +      intBuffer.rewind();
    +      document.add(new StoredField(PAGEID_NAME, intBuffer.getInt()));
    +      document.add(new StoredField(ROWID_NAME, (short) rowId));
    +    }
    +    indexWriter.addDocument(document);
    +  }
    +
    +  private void flushCacheIfPossible() throws IOException {
    +    if (cache.size() > cacheSize) {
    +      flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
    +    }
    +  }
    +
    +  public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache,
    --- End diff --
   
    It is required as `LuceneDataMapBuilder` calls it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2275: [CARBONDATA-2494] Fix lucene datasize and per...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2275#discussion_r189439715
 
    --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java ---
    @@ -212,69 +228,58 @@ private int getMaxDoc(Expression expression) {
           LOGGER.error(errorMessage);
           return null;
         }
    -
    -    // execute index search
    -    // initialize to null, else ScoreDoc objects will get accumulated in memory
    -    TopDocs result = null;
    -    try {
    -      result = indexSearcher.search(query, maxDocs);
    -    } catch (IOException e) {
    -      String errorMessage =
    -          String.format("failed to search lucene data, detail is %s", e.getMessage());
    -      LOGGER.error(errorMessage);
    -      throw new IOException(errorMessage);
    -    }
    -
         // temporary data, delete duplicated data
         // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
    -    Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>();
    -
    -    for (ScoreDoc scoreDoc : result.scoreDocs) {
    -      // get a document
    -      Document doc = indexSearcher.doc(scoreDoc.doc);
    +    Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>();
    +
    +    for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) {
    +      IndexSearcher indexSearcher = searcherEntry.getValue();
    +      // execute index search
    +      // initialize to null, else ScoreDoc objects will get accumulated in memory
    +      TopDocs result = null;
    +      try {
    +        result = indexSearcher.search(query, maxDocs);
    +      } catch (IOException e) {
    +        String errorMessage =
    +            String.format("failed to search lucene data, detail is %s", e.getMessage());
    +        LOGGER.error(errorMessage);
    +        throw new IOException(errorMessage);
    +      }
     
    -      // get all fields
    -      List<IndexableField> fieldsInDoc = doc.getFields();
    +      ByteBuffer intBuffer = ByteBuffer.allocate(4);
     
    -      // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>>
    -      String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue();
    -      Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId);
    -      if (mapPageIds == null) {
    -        mapPageIds = new HashMap<>();
    -        mapBlocks.put(blockletId, mapPageIds);
    -      }
    +      for (ScoreDoc scoreDoc : result.scoreDocs) {
    +        // get a document
    +        Document doc = indexSearcher.doc(scoreDoc.doc);
     
    -      // get the page id Map<PageId, Set<RowId>>
    -      Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue();
    -      Set<Integer> setRowId = mapPageIds.get(pageId.intValue());
    -      if (setRowId == null) {
    -        setRowId = new HashSet<>();
    -        mapPageIds.put(pageId.intValue(), setRowId);
    +        // get all fields
    +        List<IndexableField> fieldsInDoc = doc.getFields();
    +        if (writeCacheSize > 0) {
    +          fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
    --- End diff --
   
    ok


---
123