[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

classic Classic list List threaded Threaded
100 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167119457
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -455,83 +481,60 @@ public static Dictionary getDictionary(AbsoluteTableIdentifier absoluteTableIden
         return nodeBlockMapping(blockInfos, -1);
       }
     
    -  /**
    -   * the method returns the number of required executors
    -   *
    -   * @param blockInfos
    -   * @return
    -   */
    -  public static Map<String, List<Distributable>> getRequiredExecutors(
    -      List<Distributable> blockInfos) {
    -    List<NodeBlockRelation> flattenedList =
    -        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    -    for (Distributable blockInfo : blockInfos) {
    -      try {
    -        for (String eachNode : blockInfo.getLocations()) {
    -          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
    -          flattenedList.add(nbr);
    -        }
    -      } catch (IOException e) {
    -        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
    -      }
    -    }
    -    // sort the flattened data.
    -    Collections.sort(flattenedList);
    -    Map<String, List<Distributable>> nodeAndBlockMapping =
    -        new LinkedHashMap<String, List<Distributable>>(
    -            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    -    // from the flattened list create a mapping of node vs Data blocks.
    -    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
    -    return nodeAndBlockMapping;
    -  }
    -
       /**
        * This method will divide the blocks among the nodes as per the data locality
        *
        * @param blockInfos
        * @param noOfNodesInput -1 if number of nodes has to be decided
        *                       based on block location information
    +   * @param blockAssignmentStrategy strategy used to assign blocks
        * @return
    --- End diff --
   
    please complete the description for @return


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167119550
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -606,122 +609,296 @@ private static void createTaskListForNode(Map<String, List<List<Distributable>>>
       }
     
       /**
    -   * If any left over data blocks are present then assign those to nodes in round robin way.
    +   * If any left over data blocks are present then assign those to nodes in round robin way. This
    +   * will not obey the data locality.
        *
        * @param outputMap
    -   * @param uniqueBlocks
    +   * @param leftOverBlocks
    --- End diff --
   
    Please remove @param if no description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167119617
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -606,122 +609,296 @@ private static void createTaskListForNode(Map<String, List<List<Distributable>>>
       }
     
       /**
    -   * If any left over data blocks are present then assign those to nodes in round robin way.
    +   * If any left over data blocks are present then assign those to nodes in round robin way. This
    +   * will not obey the data locality.
        *
        * @param outputMap
    -   * @param uniqueBlocks
    +   * @param leftOverBlocks
        */
    -  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
    -      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
    +  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String> activeNodes,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
    +    for (int idx = 0; idx < outputMap.size(); idx++) {
    +      node2Idx.put(outputMap.get(idx).getNode(), idx);
    +    }
     
    +    // iterate all the nodes and try to allocate blocks to the nodes
         if (activeNodes != null) {
           for (String activeNode : activeNodes) {
    -        List<Distributable> blockLst = outputMap.get(activeNode);
    -        if (null == blockLst) {
    +        if (LOGGER.isDebugEnabled()) {
    +          LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
    +        }
    +
    +        Integer idx;
    +        List<Distributable> blockLst;
    +        if (node2Idx.containsKey(activeNode)) {
    +          idx = node2Idx.get(activeNode);
    +          blockLst = outputMap.get(idx).getBlocks();
    +        } else {
    +          idx = node2Idx.size();
               blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
             }
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    -        if (blockLst.size() > 0) {
    -          outputMap.put(activeNode, blockLst);
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
    +
    +        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
    +          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
    +          node2Idx.put(activeNode, idx);
             }
           }
         } else {
    -      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -        List<Distributable> blockLst = entry.getValue();
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    +      for (NodeMultiBlockRelation entry : outputMap) {
    +        List<Distributable> blockLst = entry.getBlocks();
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
           }
    -
         }
     
    -    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -      Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -      if (blocks.hasNext()) {
    -        Distributable block = blocks.next();
    -        List<Distributable> blockLst = entry.getValue();
    -        blockLst.add(block);
    -        blocks.remove();
    -      }
    -    }
    +    // if there is still blocks left, allocate them in round robin manner to each nodes
    +    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
       }
     
       /**
    -   * The method populate the blockLst to be allocate to a specific node.
    -   * @param uniqueBlocks
    -   * @param noOfBlocksPerNode
    +   * assign blocks to nodes
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   * @param blockAssignmentStrategy
    --- End diff --
   
    Please remove @param if no description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167120719
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -606,122 +609,296 @@ private static void createTaskListForNode(Map<String, List<List<Distributable>>>
       }
     
       /**
    -   * If any left over data blocks are present then assign those to nodes in round robin way.
    +   * If any left over data blocks are present then assign those to nodes in round robin way. This
    +   * will not obey the data locality.
        *
        * @param outputMap
    -   * @param uniqueBlocks
    +   * @param leftOverBlocks
        */
    -  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
    -      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
    +  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String> activeNodes,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
    +    for (int idx = 0; idx < outputMap.size(); idx++) {
    +      node2Idx.put(outputMap.get(idx).getNode(), idx);
    +    }
     
    +    // iterate all the nodes and try to allocate blocks to the nodes
         if (activeNodes != null) {
           for (String activeNode : activeNodes) {
    -        List<Distributable> blockLst = outputMap.get(activeNode);
    -        if (null == blockLst) {
    +        if (LOGGER.isDebugEnabled()) {
    +          LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
    +        }
    +
    +        Integer idx;
    +        List<Distributable> blockLst;
    +        if (node2Idx.containsKey(activeNode)) {
    +          idx = node2Idx.get(activeNode);
    +          blockLst = outputMap.get(idx).getBlocks();
    +        } else {
    +          idx = node2Idx.size();
               blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
             }
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    -        if (blockLst.size() > 0) {
    -          outputMap.put(activeNode, blockLst);
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
    +
    +        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
    +          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
    +          node2Idx.put(activeNode, idx);
             }
           }
         } else {
    -      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -        List<Distributable> blockLst = entry.getValue();
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    +      for (NodeMultiBlockRelation entry : outputMap) {
    +        List<Distributable> blockLst = entry.getBlocks();
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
           }
    -
         }
     
    -    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -      Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -      if (blocks.hasNext()) {
    -        Distributable block = blocks.next();
    -        List<Distributable> blockLst = entry.getValue();
    -        blockLst.add(block);
    -        blocks.remove();
    -      }
    -    }
    +    // if there is still blocks left, allocate them in round robin manner to each nodes
    +    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
       }
     
       /**
    -   * The method populate the blockLst to be allocate to a specific node.
    -   * @param uniqueBlocks
    -   * @param noOfBlocksPerNode
    +   * assign blocks to nodes
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   * @param blockAssignmentStrategy
    +   */
    +  private static void populateBlocks(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    switch (blockAssignmentStrategy) {
    +      case BLOCK_NUM_FIRST:
    +        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      case BLOCK_SIZE_FIRST:
    +        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      default:
    +        throw new IllegalArgumentException(
    +            "Unsupported block assignment strategy: " + blockAssignmentStrategy);
    +    }
    +  }
    +  /**
    +   * allocate blocks by block num
    --- End diff --
   
    Take N number of distributable blocks from `remainingBlocks` and add them to output parameter `blockList`. The number of blocks add accumulated size is less than `expectedSizePerNode`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167120778
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -606,122 +609,296 @@ private static void createTaskListForNode(Map<String, List<List<Distributable>>>
       }
     
       /**
    -   * If any left over data blocks are present then assign those to nodes in round robin way.
    +   * If any left over data blocks are present then assign those to nodes in round robin way. This
    +   * will not obey the data locality.
        *
        * @param outputMap
    -   * @param uniqueBlocks
    +   * @param leftOverBlocks
        */
    -  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
    -      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
    +  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String> activeNodes,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
    +    for (int idx = 0; idx < outputMap.size(); idx++) {
    +      node2Idx.put(outputMap.get(idx).getNode(), idx);
    +    }
     
    +    // iterate all the nodes and try to allocate blocks to the nodes
         if (activeNodes != null) {
           for (String activeNode : activeNodes) {
    -        List<Distributable> blockLst = outputMap.get(activeNode);
    -        if (null == blockLst) {
    +        if (LOGGER.isDebugEnabled()) {
    +          LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
    +        }
    +
    +        Integer idx;
    +        List<Distributable> blockLst;
    +        if (node2Idx.containsKey(activeNode)) {
    +          idx = node2Idx.get(activeNode);
    +          blockLst = outputMap.get(idx).getBlocks();
    +        } else {
    +          idx = node2Idx.size();
               blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
             }
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    -        if (blockLst.size() > 0) {
    -          outputMap.put(activeNode, blockLst);
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
    +
    +        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
    +          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
    +          node2Idx.put(activeNode, idx);
             }
           }
         } else {
    -      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -        List<Distributable> blockLst = entry.getValue();
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    +      for (NodeMultiBlockRelation entry : outputMap) {
    +        List<Distributable> blockLst = entry.getBlocks();
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
           }
    -
         }
     
    -    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -      Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -      if (blocks.hasNext()) {
    -        Distributable block = blocks.next();
    -        List<Distributable> blockLst = entry.getValue();
    -        blockLst.add(block);
    -        blocks.remove();
    -      }
    -    }
    +    // if there is still blocks left, allocate them in round robin manner to each nodes
    +    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
       }
     
       /**
    -   * The method populate the blockLst to be allocate to a specific node.
    -   * @param uniqueBlocks
    -   * @param noOfBlocksPerNode
    +   * assign blocks to nodes
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   * @param blockAssignmentStrategy
    +   */
    +  private static void populateBlocks(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    switch (blockAssignmentStrategy) {
    +      case BLOCK_NUM_FIRST:
    +        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      case BLOCK_SIZE_FIRST:
    +        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      default:
    +        throw new IllegalArgumentException(
    +            "Unsupported block assignment strategy: " + blockAssignmentStrategy);
    +    }
    +  }
    +  /**
    +   * allocate blocks by block num
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
        * @param blockLst
        */
    -  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
    -      List<Distributable> blockLst) {
    -    Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -    //if the node is already having the per block nodes then avoid assign the extra blocks
    -    if (blockLst.size() == noOfBlocksPerNode) {
    +  private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    // if the node is already having the per block nodes then avoid assign the extra blocks
    +    if (blockLst.size() == expectedSizePerNode) {
           return;
         }
         while (blocks.hasNext()) {
           Distributable block = blocks.next();
           blockLst.add(block);
           blocks.remove();
    -      if (blockLst.size() >= noOfBlocksPerNode) {
    +      if (blockLst.size() >= expectedSizePerNode) {
    +        break;
    +      }
    +    }
    +  }
    +
    +  /**
    +   * allocate blocks by block size
    --- End diff --
   
    Change to:
    Take N number of distributable blocks from remainingBlocks and add them to output parameter blockList. The number of blocks added is determined by accumulated block size is less than expectedSizePerNode



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167121062
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -606,122 +609,296 @@ private static void createTaskListForNode(Map<String, List<List<Distributable>>>
       }
     
       /**
    -   * If any left over data blocks are present then assign those to nodes in round robin way.
    +   * If any left over data blocks are present then assign those to nodes in round robin way. This
    +   * will not obey the data locality.
        *
        * @param outputMap
    -   * @param uniqueBlocks
    +   * @param leftOverBlocks
        */
    -  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
    -      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
    +  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String> activeNodes,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
    +    for (int idx = 0; idx < outputMap.size(); idx++) {
    +      node2Idx.put(outputMap.get(idx).getNode(), idx);
    +    }
     
    +    // iterate all the nodes and try to allocate blocks to the nodes
         if (activeNodes != null) {
           for (String activeNode : activeNodes) {
    -        List<Distributable> blockLst = outputMap.get(activeNode);
    -        if (null == blockLst) {
    +        if (LOGGER.isDebugEnabled()) {
    +          LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
    +        }
    +
    +        Integer idx;
    +        List<Distributable> blockLst;
    +        if (node2Idx.containsKey(activeNode)) {
    +          idx = node2Idx.get(activeNode);
    +          blockLst = outputMap.get(idx).getBlocks();
    +        } else {
    +          idx = node2Idx.size();
               blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
             }
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    -        if (blockLst.size() > 0) {
    -          outputMap.put(activeNode, blockLst);
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
    +
    +        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
    +          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
    +          node2Idx.put(activeNode, idx);
             }
           }
         } else {
    -      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -        List<Distributable> blockLst = entry.getValue();
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    +      for (NodeMultiBlockRelation entry : outputMap) {
    +        List<Distributable> blockLst = entry.getBlocks();
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
           }
    -
         }
     
    -    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
    -      Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -      if (blocks.hasNext()) {
    -        Distributable block = blocks.next();
    -        List<Distributable> blockLst = entry.getValue();
    -        blockLst.add(block);
    -        blocks.remove();
    -      }
    -    }
    +    // if there is still blocks left, allocate them in round robin manner to each nodes
    +    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
       }
     
       /**
    -   * The method populate the blockLst to be allocate to a specific node.
    -   * @param uniqueBlocks
    -   * @param noOfBlocksPerNode
    +   * assign blocks to nodes
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   * @param blockAssignmentStrategy
    +   */
    +  private static void populateBlocks(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    switch (blockAssignmentStrategy) {
    +      case BLOCK_NUM_FIRST:
    +        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      case BLOCK_SIZE_FIRST:
    +        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      default:
    +        throw new IllegalArgumentException(
    +            "Unsupported block assignment strategy: " + blockAssignmentStrategy);
    +    }
    +  }
    +  /**
    +   * allocate blocks by block num
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
        * @param blockLst
        */
    -  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
    -      List<Distributable> blockLst) {
    -    Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -    //if the node is already having the per block nodes then avoid assign the extra blocks
    -    if (blockLst.size() == noOfBlocksPerNode) {
    +  private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    // if the node is already having the per block nodes then avoid assign the extra blocks
    +    if (blockLst.size() == expectedSizePerNode) {
           return;
         }
         while (blocks.hasNext()) {
           Distributable block = blocks.next();
           blockLst.add(block);
           blocks.remove();
    -      if (blockLst.size() >= noOfBlocksPerNode) {
    +      if (blockLst.size() >= expectedSizePerNode) {
    +        break;
    +      }
    +    }
    +  }
    +
    +  /**
    +   * allocate blocks by block size
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   */
    +  private static void populateBlocksBySize(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    //if the node is already having the avg node size then avoid assign the extra blocks
    +    long fileSize = 0;
    +    for (Distributable block : blockLst) {
    +      fileSize += ((TableBlockInfo) block).getBlockLength();
    +    }
    +    if (fileSize >= expectedSizePerNode) {
    +      LOGGER.debug("Capacity is full, skip allocate blocks on this node");
    +      return;
    +    }
    +
    +    while (blocks.hasNext()) {
    +      Distributable block = blocks.next();
    +      long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
    +      if (fileSize < expectedSizePerNode) {
    +        // `fileSize==0` means there are no blocks assigned to this node before
    +        if (fileSize == 0 || fileSize + thisBlockSize <= expectedSizePerNode * 1.1D) {
    +          blockLst.add(block);
    +          if (LOGGER.isDebugEnabled()) {
    +            LOGGER.debug("Second Assignment iteration: "
    +                + ((TableBlockInfo) block).getFilePath() + "-"
    +                + ((TableBlockInfo) block).getBlockLength() + "-->currentNode");
    +          }
    +          fileSize += thisBlockSize;
    +          blocks.remove();
    +        }
    +      } else {
             break;
           }
         }
       }
     
    +  /**
    +   * allocate the blocks in round robin manner
    +   * @param node2Blocks
    +   * @param remainingBlocks
    +   * @param blockAssignmentStrategy
    +   */
    +  private static void assignBlocksUseRoundRobin(ArrayList<NodeMultiBlockRelation> node2Blocks,
    +      Set<Distributable> remainingBlocks, BlockAssignmentStrategy blockAssignmentStrategy) {
    +    switch (blockAssignmentStrategy) {
    +      case BLOCK_NUM_FIRST:
    +        roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
    +        break;
    +      case BLOCK_SIZE_FIRST:
    +        roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
    +        break;
    +      default:
    +        throw new IllegalArgumentException("Unsupported block assignment strategy: "
    +            + blockAssignmentStrategy);
    +    }
    +  }
    +
    +  private static void roundRobinAssignBlocksByNum(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> remainingBlocks) {
    +    for (NodeMultiBlockRelation relation: outputMap) {
    +      Iterator<Distributable> blocks = remainingBlocks.iterator();
    +      if (blocks.hasNext()) {
    +        Distributable block = blocks.next();
    +        List<Distributable> blockLst = relation.getBlocks();
    +        blockLst.add(block);
    +        blocks.remove();
    +      }
    +    }
    +  }
    +
    +  private static void roundRobinAssignBlocksBySize(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> remainingBlocks) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    while (blocks.hasNext()) {
    +      // sort the allocated node-2-blocks in ascending order, the total data size of first one is
    +      // the smallest, so we assign this block to it.
    +      Collections.sort(outputMap, NodeMultiBlockRelation.DATA_SIZE_ASC_COMPARATOR);
    +      Distributable block = blocks.next();
    +      List<Distributable> blockLst = outputMap.get(0).getBlocks();
    +      blockLst.add(block);
    +      if (LOGGER.isDebugEnabled()) {
    +        LOGGER.debug("RoundRobin assignment iteration: "
    +            + ((TableBlockInfo) block).getFilePath() + "-"
    +            + ((TableBlockInfo) block).getBlockLength() + "-->" + outputMap.get(0).getNode());
    +      }
    +      blocks.remove();
    +    }
    +  }
       /**
        * To create the final output of the Node and Data blocks
        *
    -   * @param outputMap
    -   * @param blocksPerNode
    -   * @param uniqueBlocks
    -   * @param nodeAndBlockMapping
    +   * @param outputNode2Blocks
    +   * @param expectedSizePerNode
    +   * @param remainingBlocks
    +   * @param inputNode2Blocks
    --- End diff --
   
    move @param if no description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3608/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2370/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3609/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3456/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2373/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3610/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3613/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3619/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1808: [CARBONDATA-2023][DataLoad] Add size base block allo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1808
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2379/



---
12345