[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
GitHub user mohammadshahidkhan opened a pull request:

    https://github.com/apache/carbondata/pull/2429

    [CARBONDATA-2674][Streaming]Streaming with merge index enabled does not consider the merge index file while pruning.

    …
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [X] Any interfaces changed?
     None
     - [X] Any backward compatibility impacted?
     None
     - [X] Document update required?
    None
     - [X] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           Added test to varify the merge index scenarios.
     - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
      NA


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mohammadshahidkhan/incubator-carbondata streaming_with_mi

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2429.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2429
   
----
commit 7fb282da1c51fca55a4c8c93e06c93b04a3a29e2
Author: mohammadshahidkhan <mohdshahidkhan1987@...>
Date:   2018-06-29T10:44:08Z

    [CARBONDATA-2674][Streaming]Streaming with merge index enabled does not consider the merge index file while pruning.

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6654/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5481/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6658/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5485/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6698/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5525/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5552/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199548100
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    +            byte[] value = entry.getValue();
    --- End diff --
   
    give a better name, `value` is too generic


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199548471
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    +            byte[] value = entry.getValue();
    +            if (null != value) {
    +              indexReader.openThriftReader(value);
    +            } else {
    +              String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    +              String indexPath = segmentDir + File.separator + indexName;
    +              CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    +              // index file exists
    +              if (index.exists()) {
    +                // data file exists
    +                indexReader.openThriftReader(indexPath);
    +              }
    +            }
                 try {
                   // map block index
    -              indexReader.openThriftReader(indexPath);
    -              while (indexReader.hasNext()) {
    +              while (null != indexReader && indexReader.hasNext()) {
    --- End diff --
   
    `indexReader` is initialized in line 354, can it be null?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199549461
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    --- End diff --
   
    It seems key is never used, then it is better to use `carbonIndexMap.values`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199685348
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    +            byte[] value = entry.getValue();
    +            if (null != value) {
    +              indexReader.openThriftReader(value);
    +            } else {
    +              String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    +              String indexPath = segmentDir + File.separator + indexName;
    +              CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    +              // index file exists
    +              if (index.exists()) {
    +                // data file exists
    +                indexReader.openThriftReader(indexPath);
    +              }
    +            }
                 try {
                   // map block index
    -              indexReader.openThriftReader(indexPath);
    -              while (indexReader.hasNext()) {
    +              while (null != indexReader && indexReader.hasNext()) {
    --- End diff --
   
    Thanks @jackylk indexReader can not be null.
    The CarbonIndexFileReader.hasNext() can throw NullPointerException if index file does not exists,
    so corrected the hasNext API.
              if (index.exists()) {
                    // data file exists
                    indexReader.openThriftReader(indexPath);
                  }


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199685364
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    --- End diff --
   
    Fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user mohammadshahidkhan commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199685424
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    +            byte[] value = entry.getValue();
    --- End diff --
   
    Fixed renamed to fileData


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5550/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5569/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5552/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6723/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2429: [CARBONDATA-2674][Streaming]Streaming with merge ind...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2429
 
    LGTM


---
12