[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
GitHub user manishgupta88 opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/362

    [CARBONDATA-459] Block distribution is wrong in case of dynamic allocation=true

    Problem: Block distribution is wrong in case of dynamic allocation=true
   
    Analysis: In case when dynamic allocation is true and configured max executors are more than the initial executors then carbon is not able to request the max number of executors configured. Due to this resources are getting under utilized and case when number of blocks increases, the distribution of blocks is limited to the number of nodes and the number of tasks launched are less. This leads to under utilization of resources and hence impacts the query and load performance.
   
    Fix: Request for starting the maximum number of configured executors in case dynamic allocation is true.
   
    Impact area: Query and data load flow performance due to under utilization of resources.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/manishgupta88/incubator-carbondata dynamic_allocation_block_distribution

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #362
   
----
commit 0019dfc08ef589d75c45feecd8b559bca0b35f08
Author: manishgupta88 <[hidden email]>
Date:   2016-11-28T10:07:11Z

    Problem: Block distribution is wrong in case of dynamic allocation=true
   
    Analysis: In case when dynamic allocation is true and configured max executors are more than the initial executors then carbon is not able to request the max number of executors configured. Due to this resources are getting under utilized and case when number of blocks increases, the distribution of blocks is limited to the number of nodes and the number of tasks launched are less. This leads to under utilization of resources and hence impacts the query and load performance.
   
    Fix: Request for starting the maximum number of configured executors in case dynamic allocation is true.
   
    Impact area: Query and data load flow performance due to under utilization of resources.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031584
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---
    @@ -208,4 +208,36 @@ object CarbonContext {
         }
     
       }
    +
    +  /**
    +   * Requesting the extra executors other than the existing ones.
    +   *
    +   * @param sc                   sparkContext
    +   * @param requiredExecutors         required number of executors to be requested
    +   * @param localityAwareTasks   The number of pending tasks which is locality required
    +   * @param hostToLocalTaskCount A map to store hostname with its possible task number running on it
    +   * @return
    +   */
    +  final def ensureExecutors(sc: SparkContext,
    --- End diff --
   
    Do not add this function here, add it to DistributionUtil, like in #365. It will be reused for spark2 integration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031795
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    +      sparkContext: SparkContext): Seq[String] = {
    +    val nodesOfData = nodeMapping.size()
    +    val confExecutors: Int = getConfiguredExecutors(sparkContext)
    +    LOGGER.info("Executors configured : " + confExecutors)
    +    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
    +      confExecutors
    +    } else if (confExecutors > nodesOfData) {
    +      // this case will come only if dynamic allocation is true
    +      var totalExecutorsToBeRequested = nodesOfData
    +      // If total number of blocks are greater than the nodes identified then ensure
    +      // that the configured number of max executors can be opened based on the difference of
    +      // block list size and nodes identified
    +      if (blockList.size > nodesOfData) {
    +        // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors
    +        // need to be opened
    +        // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
    +        // total 4 executors need to be opened
    +        val extraExecutorsToBeRequested = blockList.size - nodesOfData
    +        if (extraExecutorsToBeRequested > confExecutors) {
    +          totalExecutorsToBeRequested = confExecutors
    +        } else {
    +          totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested
    +        }
           }
    +      LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested)
    +      totalExecutorsToBeRequested
    +    } else {
    +      nodesOfData
         }
     
    -    val confExecutors = if (null != confExecutorsTemp) {
    -      confExecutorsTemp.toInt
    +    val startTime = System.currentTimeMillis();
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // this case will come only if dynamic allocation is true
    +      CarbonContext
    +        .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty)
         } else {
    -      1
    +      CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
         }
    -    val requiredExecutors = if (nodeMapping.size > confExecutors) {
    -      confExecutors
    +    getDistinctNodesList(sparkContext, requiredExecutors, startTime)
    +  }
    +
    +  /**
    +   * This method will return the configured executors
    +   *
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
    +    var confExecutors: Int = 0
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // default value for spark.dynamicAllocation.maxExecutors is infinity
    +      confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
         } else {
    -      nodeMapping.size()
    +      // default value for spark.executor.instances is 2
    +      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1)
         }
    +    confExecutors
    +  }
     
    -    val startTime = System.currentTimeMillis()
    -    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
    +  /**
    +   * This method will return the distinct nodes list
    +   *
    +   * @param sparkContext
    +   * @param requiredExecutors
    +   * @param startTime
    +   * @return
    +   */
    +  private def getDistinctNodesList(sparkContext: SparkContext,
    +      requiredExecutors: Int,
    +      startTime: Long): Seq[String] = {
         var nodes = DistributionUtil.getNodeList(sparkContext)
    -    var maxTimes = 30
    +    var maxTimes = 10;
         while (nodes.length < requiredExecutors && maxTimes > 0) {
    -      Thread.sleep(500)
    +      Thread.sleep(500);
           nodes = DistributionUtil.getNodeList(sparkContext)
    -      maxTimes = maxTimes - 1
    +      maxTimes = maxTimes - 1;
         }
    -    val timDiff = System.currentTimeMillis() - startTime
    -    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
    -    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
    -    nodes.distinct
    +    val timDiff = System.currentTimeMillis() - startTime;
    --- End diff --
   
    remove ;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031916
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    +      sparkContext: SparkContext): Seq[String] = {
    +    val nodesOfData = nodeMapping.size()
    +    val confExecutors: Int = getConfiguredExecutors(sparkContext)
    +    LOGGER.info("Executors configured : " + confExecutors)
    +    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
    +      confExecutors
    +    } else if (confExecutors > nodesOfData) {
    +      // this case will come only if dynamic allocation is true
    +      var totalExecutorsToBeRequested = nodesOfData
    +      // If total number of blocks are greater than the nodes identified then ensure
    +      // that the configured number of max executors can be opened based on the difference of
    +      // block list size and nodes identified
    +      if (blockList.size > nodesOfData) {
    +        // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors
    +        // need to be opened
    +        // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
    +        // total 4 executors need to be opened
    +        val extraExecutorsToBeRequested = blockList.size - nodesOfData
    +        if (extraExecutorsToBeRequested > confExecutors) {
    +          totalExecutorsToBeRequested = confExecutors
    +        } else {
    +          totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested
    +        }
           }
    +      LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested)
    +      totalExecutorsToBeRequested
    +    } else {
    +      nodesOfData
         }
     
    -    val confExecutors = if (null != confExecutorsTemp) {
    -      confExecutorsTemp.toInt
    +    val startTime = System.currentTimeMillis();
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // this case will come only if dynamic allocation is true
    +      CarbonContext
    +        .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty)
         } else {
    -      1
    +      CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
         }
    -    val requiredExecutors = if (nodeMapping.size > confExecutors) {
    -      confExecutors
    +    getDistinctNodesList(sparkContext, requiredExecutors, startTime)
    +  }
    +
    +  /**
    +   * This method will return the configured executors
    +   *
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
    +    var confExecutors: Int = 0
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // default value for spark.dynamicAllocation.maxExecutors is infinity
    +      confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
         } else {
    -      nodeMapping.size()
    +      // default value for spark.executor.instances is 2
    +      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1)
         }
    +    confExecutors
    +  }
     
    -    val startTime = System.currentTimeMillis()
    -    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
    +  /**
    +   * This method will return the distinct nodes list
    +   *
    +   * @param sparkContext
    +   * @param requiredExecutors
    +   * @param startTime
    +   * @return
    +   */
    +  private def getDistinctNodesList(sparkContext: SparkContext,
    +      requiredExecutors: Int,
    +      startTime: Long): Seq[String] = {
         var nodes = DistributionUtil.getNodeList(sparkContext)
    -    var maxTimes = 30
    +    var maxTimes = 10;
         while (nodes.length < requiredExecutors && maxTimes > 0) {
    -      Thread.sleep(500)
    +      Thread.sleep(500);
           nodes = DistributionUtil.getNodeList(sparkContext)
    -      maxTimes = maxTimes - 1
    +      maxTimes = maxTimes - 1;
         }
    -    val timDiff = System.currentTimeMillis() - startTime
    -    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
    -    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
    -    nodes.distinct
    +    val timDiff = System.currentTimeMillis() - startTime;
    +    LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
    --- End diff --
   
    I think does not need to change this, use s"" style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90032032
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    +      sparkContext: SparkContext): Seq[String] = {
    +    val nodesOfData = nodeMapping.size()
    +    val confExecutors: Int = getConfiguredExecutors(sparkContext)
    +    LOGGER.info("Executors configured : " + confExecutors)
    +    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
    +      confExecutors
    +    } else if (confExecutors > nodesOfData) {
    +      // this case will come only if dynamic allocation is true
    +      var totalExecutorsToBeRequested = nodesOfData
    +      // If total number of blocks are greater than the nodes identified then ensure
    +      // that the configured number of max executors can be opened based on the difference of
    +      // block list size and nodes identified
    +      if (blockList.size > nodesOfData) {
    +        // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors
    +        // need to be opened
    +        // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
    +        // total 4 executors need to be opened
    +        val extraExecutorsToBeRequested = blockList.size - nodesOfData
    +        if (extraExecutorsToBeRequested > confExecutors) {
    +          totalExecutorsToBeRequested = confExecutors
    +        } else {
    +          totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested
    +        }
           }
    +      LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested)
    +      totalExecutorsToBeRequested
    +    } else {
    +      nodesOfData
         }
     
    -    val confExecutors = if (null != confExecutorsTemp) {
    -      confExecutorsTemp.toInt
    +    val startTime = System.currentTimeMillis();
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // this case will come only if dynamic allocation is true
    +      CarbonContext
    +        .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty)
         } else {
    -      1
    +      CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
         }
    -    val requiredExecutors = if (nodeMapping.size > confExecutors) {
    -      confExecutors
    +    getDistinctNodesList(sparkContext, requiredExecutors, startTime)
    +  }
    +
    +  /**
    +   * This method will return the configured executors
    +   *
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
    +    var confExecutors: Int = 0
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // default value for spark.dynamicAllocation.maxExecutors is infinity
    +      confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
         } else {
    -      nodeMapping.size()
    +      // default value for spark.executor.instances is 2
    +      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1)
         }
    +    confExecutors
    +  }
     
    -    val startTime = System.currentTimeMillis()
    -    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
    +  /**
    +   * This method will return the distinct nodes list
    +   *
    +   * @param sparkContext
    +   * @param requiredExecutors
    +   * @param startTime
    +   * @return
    +   */
    +  private def getDistinctNodesList(sparkContext: SparkContext,
    +      requiredExecutors: Int,
    +      startTime: Long): Seq[String] = {
         var nodes = DistributionUtil.getNodeList(sparkContext)
    -    var maxTimes = 30
    +    var maxTimes = 10;
         while (nodes.length < requiredExecutors && maxTimes > 0) {
    -      Thread.sleep(500)
    +      Thread.sleep(500);
           nodes = DistributionUtil.getNodeList(sparkContext)
    -      maxTimes = maxTimes - 1
    +      maxTimes = maxTimes - 1;
         }
    -    val timDiff = System.currentTimeMillis() - startTime
    -    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
    -    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
    -    nodes.distinct
    +    val timDiff = System.currentTimeMillis() - startTime;
    +    LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
    +    LOGGER.info("Time elapsed to allocate the required executors : " + (10 - maxTimes) * 500)
    --- End diff --
   
    use s"" style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90032072
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    +      sparkContext: SparkContext): Seq[String] = {
    +    val nodesOfData = nodeMapping.size()
    +    val confExecutors: Int = getConfiguredExecutors(sparkContext)
    +    LOGGER.info("Executors configured : " + confExecutors)
    +    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
    +      confExecutors
    +    } else if (confExecutors > nodesOfData) {
    +      // this case will come only if dynamic allocation is true
    +      var totalExecutorsToBeRequested = nodesOfData
    +      // If total number of blocks are greater than the nodes identified then ensure
    +      // that the configured number of max executors can be opened based on the difference of
    +      // block list size and nodes identified
    +      if (blockList.size > nodesOfData) {
    +        // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors
    +        // need to be opened
    +        // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
    +        // total 4 executors need to be opened
    +        val extraExecutorsToBeRequested = blockList.size - nodesOfData
    +        if (extraExecutorsToBeRequested > confExecutors) {
    +          totalExecutorsToBeRequested = confExecutors
    +        } else {
    +          totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested
    +        }
           }
    +      LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested)
    +      totalExecutorsToBeRequested
    +    } else {
    +      nodesOfData
         }
     
    -    val confExecutors = if (null != confExecutorsTemp) {
    -      confExecutorsTemp.toInt
    +    val startTime = System.currentTimeMillis();
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // this case will come only if dynamic allocation is true
    +      CarbonContext
    +        .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty)
         } else {
    -      1
    +      CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
         }
    -    val requiredExecutors = if (nodeMapping.size > confExecutors) {
    -      confExecutors
    +    getDistinctNodesList(sparkContext, requiredExecutors, startTime)
    +  }
    +
    +  /**
    +   * This method will return the configured executors
    +   *
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
    +    var confExecutors: Int = 0
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // default value for spark.dynamicAllocation.maxExecutors is infinity
    +      confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
         } else {
    -      nodeMapping.size()
    +      // default value for spark.executor.instances is 2
    +      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1)
         }
    +    confExecutors
    +  }
     
    -    val startTime = System.currentTimeMillis()
    -    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
    +  /**
    +   * This method will return the distinct nodes list
    +   *
    +   * @param sparkContext
    +   * @param requiredExecutors
    +   * @param startTime
    +   * @return
    +   */
    +  private def getDistinctNodesList(sparkContext: SparkContext,
    +      requiredExecutors: Int,
    +      startTime: Long): Seq[String] = {
         var nodes = DistributionUtil.getNodeList(sparkContext)
    -    var maxTimes = 30
    +    var maxTimes = 10;
         while (nodes.length < requiredExecutors && maxTimes > 0) {
    -      Thread.sleep(500)
    +      Thread.sleep(500);
           nodes = DistributionUtil.getNodeList(sparkContext)
    -      maxTimes = maxTimes - 1
    +      maxTimes = maxTimes - 1;
    --- End diff --
   
    remove ;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90033298
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    +      sparkContext: SparkContext): Seq[String] = {
    +    val nodesOfData = nodeMapping.size()
    +    val confExecutors: Int = getConfiguredExecutors(sparkContext)
    +    LOGGER.info("Executors configured : " + confExecutors)
    +    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
    +      confExecutors
    +    } else if (confExecutors > nodesOfData) {
    +      // this case will come only if dynamic allocation is true
    +      var totalExecutorsToBeRequested = nodesOfData
    +      // If total number of blocks are greater than the nodes identified then ensure
    +      // that the configured number of max executors can be opened based on the difference of
    +      // block list size and nodes identified
    +      if (blockList.size > nodesOfData) {
    +        // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors
    +        // need to be opened
    +        // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
    +        // total 4 executors need to be opened
    +        val extraExecutorsToBeRequested = blockList.size - nodesOfData
    +        if (extraExecutorsToBeRequested > confExecutors) {
    +          totalExecutorsToBeRequested = confExecutors
    +        } else {
    +          totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested
    +        }
           }
    +      LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested)
    +      totalExecutorsToBeRequested
    +    } else {
    +      nodesOfData
         }
     
    -    val confExecutors = if (null != confExecutorsTemp) {
    -      confExecutorsTemp.toInt
    +    val startTime = System.currentTimeMillis();
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // this case will come only if dynamic allocation is true
    +      CarbonContext
    +        .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty)
         } else {
    -      1
    +      CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
         }
    -    val requiredExecutors = if (nodeMapping.size > confExecutors) {
    -      confExecutors
    +    getDistinctNodesList(sparkContext, requiredExecutors, startTime)
    +  }
    +
    +  /**
    +   * This method will return the configured executors
    +   *
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
    +    var confExecutors: Int = 0
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // default value for spark.dynamicAllocation.maxExecutors is infinity
    +      confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
         } else {
    -      nodeMapping.size()
    +      // default value for spark.executor.instances is 2
    +      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1)
         }
    +    confExecutors
    +  }
     
    -    val startTime = System.currentTimeMillis()
    -    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
    +  /**
    +   * This method will return the distinct nodes list
    +   *
    +   * @param sparkContext
    +   * @param requiredExecutors
    +   * @param startTime
    +   * @return
    +   */
    +  private def getDistinctNodesList(sparkContext: SparkContext,
    +      requiredExecutors: Int,
    +      startTime: Long): Seq[String] = {
         var nodes = DistributionUtil.getNodeList(sparkContext)
    -    var maxTimes = 30
    +    var maxTimes = 10;
    --- End diff --
   
    why change to 10?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90033528
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    --- End diff --
   
    please format the code style for parameter list correctly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #362: [CARBONDATA-459] Block distribution is wron...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/362
 
    @jackylk ...Handled all review comments. Kindly review and merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/362#discussion_r90365594
 
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala ---
    @@ -101,47 +101,107 @@ object DistributionUtil {
        * Checking if the existing executors is greater than configured executors, if yes
        * returning configured executors.
        *
    -   * @param blockList
    +   * @param blockList total number of blocks in the identified segments
        * @param sparkContext
        * @return
        */
       def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
         sparkContext: SparkContext): Seq[String] = {
         val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
    -    var confExecutorsTemp: String = null
    -    if (sparkContext.getConf.contains("spark.executor.instances")) {
    -      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
    -    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
    -               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
    -                 .equalsIgnoreCase("true")) {
    -      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
    -        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
    +    ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
    +  }
    +
    +  /**
    +   * This method will ensure that the required/configured number of executors are requested
    +   * for processing the identified blocks
    +   *
    +   * @param nodeMapping
    +   * @param blockList
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util
    +  .List[Distributable]], blockList: Seq[Distributable],
    +      sparkContext: SparkContext): Seq[String] = {
    +    val nodesOfData = nodeMapping.size()
    +    val confExecutors: Int = getConfiguredExecutors(sparkContext)
    +    LOGGER.info("Executors configured : " + confExecutors)
    +    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
    +      confExecutors
    +    } else if (confExecutors > nodesOfData) {
    +      // this case will come only if dynamic allocation is true
    +      var totalExecutorsToBeRequested = nodesOfData
    +      // If total number of blocks are greater than the nodes identified then ensure
    +      // that the configured number of max executors can be opened based on the difference of
    +      // block list size and nodes identified
    +      if (blockList.size > nodesOfData) {
    +        // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors
    +        // need to be opened
    +        // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
    +        // total 4 executors need to be opened
    +        val extraExecutorsToBeRequested = blockList.size - nodesOfData
    +        if (extraExecutorsToBeRequested > confExecutors) {
    +          totalExecutorsToBeRequested = confExecutors
    +        } else {
    +          totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested
    +        }
           }
    +      LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested)
    +      totalExecutorsToBeRequested
    +    } else {
    +      nodesOfData
         }
     
    -    val confExecutors = if (null != confExecutorsTemp) {
    -      confExecutorsTemp.toInt
    +    val startTime = System.currentTimeMillis();
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // this case will come only if dynamic allocation is true
    +      CarbonContext
    +        .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty)
         } else {
    -      1
    +      CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
         }
    -    val requiredExecutors = if (nodeMapping.size > confExecutors) {
    -      confExecutors
    +    getDistinctNodesList(sparkContext, requiredExecutors, startTime)
    +  }
    +
    +  /**
    +   * This method will return the configured executors
    +   *
    +   * @param sparkContext
    +   * @return
    +   */
    +  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
    +    var confExecutors: Int = 0
    +    if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    +      // default value for spark.dynamicAllocation.maxExecutors is infinity
    +      confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
         } else {
    -      nodeMapping.size()
    +      // default value for spark.executor.instances is 2
    +      confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1)
         }
    +    confExecutors
    +  }
     
    -    val startTime = System.currentTimeMillis()
    -    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
    +  /**
    +   * This method will return the distinct nodes list
    +   *
    +   * @param sparkContext
    +   * @param requiredExecutors
    +   * @param startTime
    +   * @return
    +   */
    +  private def getDistinctNodesList(sparkContext: SparkContext,
    +      requiredExecutors: Int,
    +      startTime: Long): Seq[String] = {
         var nodes = DistributionUtil.getNodeList(sparkContext)
    -    var maxTimes = 30
    +    var maxTimes = 10;
    --- End diff --
   
    can we make it calculated based on configuration instead of hard coding it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #362: [CARBONDATA-459] Block distribution is wron...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/362
 
    I have ginve one more comment and, others LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #362: [CARBONDATA-459] Block distribution is wron...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/362
 
    @jackylk ...Handled review comment. Please review and merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-carbondata/pull/362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---