[GitHub] carbondata pull request #1702: [WIP][CARBONDATA-1896] Clean files operation ...

classic Classic list List threaded Threaded
88 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
Github user dhatchayani commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2257/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1034/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2512/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2513/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2261/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158474524
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -178,6 +178,7 @@ case class CarbonLoadDataCommand(
             // First system has to partition the data first and then call the load data
             LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
             GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
    +        DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
    --- End diff --
   
    Please add the purpose why we have to move deletion call from CarbonDataRDDFactory.scala to here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1039/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158477319
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---
    @@ -133,16 +138,23 @@ public static boolean deleteLoadFoldersFromFileSystem(
           for (LoadMetadataDetails oneLoad : details) {
             if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
               String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
    -          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
    -          if (deletionStatus) {
    -            isDeleted = true;
    -            oneLoad.setVisibility("false");
    -            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
    +          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
    --- End diff --
   
    1) Please add a log for lock acquire status
    2) Please check the latest status if the current in progress is just completed. If so, we will accidentally delete completed load.
    3) Can we use lock with out reties as this work is only for cleanup and retries may make clean files command very slow due to new lock.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158480429
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -418,8 +429,11 @@ object CarbonDataRDDFactory {
                   errorMessage = errorMessage + ": " + executorMessage
                 }
             }
    +        segmentLock.unlock()
    --- End diff --
   
    Please remove this. unlock in finally is enough


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158480479
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---
    @@ -133,16 +138,23 @@ public static boolean deleteLoadFoldersFromFileSystem(
           for (LoadMetadataDetails oneLoad : details) {
             if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
               String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
    -          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
    -          if (deletionStatus) {
    -            isDeleted = true;
    -            oneLoad.setVisibility("false");
    -            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
    +          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
    +              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
    +          if (segmentLock.lockWithRetries()) {
    +            boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
    +            if (deletionStatus) {
    +              isDeleted = true;
    +              oneLoad.setVisibility("false");
    +              LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
    +            }
    +            segmentLock.unlock();
    --- End diff --
   
    Please move lock to finally


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158490560
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---
    @@ -133,16 +138,23 @@ public static boolean deleteLoadFoldersFromFileSystem(
           for (LoadMetadataDetails oneLoad : details) {
             if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
               String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
    -          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
    -          if (deletionStatus) {
    -            isDeleted = true;
    -            oneLoad.setVisibility("false");
    -            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
    +          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
    --- End diff --
   
    logic is divided


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    retest sdv please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2271/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158509584
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -315,88 +314,100 @@ object CarbonDataRDDFactory {
         val isSortTable = carbonTable.getNumberOfSortColumns > 0
         val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
     
    +    val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
    +      CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
    +
         try {
    -      if (updateModel.isDefined) {
    -        res = loadDataFrameForUpdate(
    -          sqlContext,
    -          dataFrame,
    -          carbonLoadModel,
    -          updateModel,
    -          carbonTable)
    -        res.foreach { resultOfSeg =>
    -          resultOfSeg.foreach { resultOfBlock =>
    -            if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
    -              loadStatus = SegmentStatus.LOAD_FAILURE
    -              if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
    -                updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
    -                updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
    -              } else {
    -                updateModel.get.executorErrors = resultOfBlock._2._2
    +      if (segmentLock.lockWithRetries()) {
    +        if (updateModel.isDefined) {
    +          res = loadDataFrameForUpdate(
    +            sqlContext,
    +            dataFrame,
    +            carbonLoadModel,
    +            updateModel,
    +            carbonTable)
    +          res.foreach { resultOfSeg =>
    +            resultOfSeg.foreach { resultOfBlock =>
    +              if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
    +                loadStatus = SegmentStatus.LOAD_FAILURE
    +                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
    +                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
    +                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
    +                } else {
    +                  updateModel.get.executorErrors = resultOfBlock._2._2
    +                }
    +              } else if (resultOfBlock._2._1.getSegmentStatus ==
    +                         SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    +                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
    +                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
                   }
    -            } else if (resultOfBlock._2._1.getSegmentStatus ==
    -                       SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    -              loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    -              updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
    -              updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
                 }
               }
    -        }
    -      } else {
    -        status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
    -          loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
    -        } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
    -          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
    -            dataFrame, carbonLoadModel, hadoopConf)
    -        } else if (dataFrame.isDefined) {
    -          loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
             } else {
    -          loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
    -        }
    -        CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
    -          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
    -        val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
    -        if (status.nonEmpty) {
    -          status.foreach { eachLoadStatus =>
    -            val state = newStatusMap.get(eachLoadStatus._1)
    -            state match {
    -              case Some(SegmentStatus.LOAD_FAILURE) =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -              case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    -                if eachLoadStatus._2._1.getSegmentStatus ==
    -                   SegmentStatus.SUCCESS =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -              case _ =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -            }
    +          status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
    +            loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
    +          } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
    +            DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
    +              dataFrame, carbonLoadModel, hadoopConf)
    +          } else if (dataFrame.isDefined) {
    +            loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
    +          } else {
    +            loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
               }
    -
    -          newStatusMap.foreach {
    -            case (key, value) =>
    -              if (value == SegmentStatus.LOAD_FAILURE) {
    -                loadStatus = SegmentStatus.LOAD_FAILURE
    -              } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
    -                         loadStatus!= SegmentStatus.LOAD_FAILURE) {
    -                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
    +            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
    +          val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
    +          if (status.nonEmpty) {
    +            status.foreach { eachLoadStatus =>
    +              val state = newStatusMap.get(eachLoadStatus._1)
    +              state match {
    +                case Some(SegmentStatus.LOAD_FAILURE) =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    +                case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +                  if eachLoadStatus._2._1.getSegmentStatus ==
    +                     SegmentStatus.SUCCESS =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    +                case _ =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
                   }
    -          }
    -        } else {
    -          // if no value is there in data load, make load status Success
    -          // and data load flow executes
    -          if (dataFrame.isDefined && updateModel.isEmpty) {
    -            val rdd = dataFrame.get.rdd
    -            if (rdd.partitions == null || rdd.partitions.length == 0) {
    -              LOGGER.warn("DataLoading finished. No data was loaded.")
    -              loadStatus = SegmentStatus.SUCCESS
    +            }
    +
    +            newStatusMap.foreach {
    +              case (key, value) =>
    +                if (value == SegmentStatus.LOAD_FAILURE) {
    +                  loadStatus = SegmentStatus.LOAD_FAILURE
    +                } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
    +                           loadStatus != SegmentStatus.LOAD_FAILURE) {
    +                  loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +                }
                 }
               } else {
    -            loadStatus = SegmentStatus.LOAD_FAILURE
    +            // if no value is there in data load, make load status Success
    +            // and data load flow executes
    +            if (dataFrame.isDefined && updateModel.isEmpty) {
    +              val rdd = dataFrame.get.rdd
    +              if (rdd.partitions == null || rdd.partitions.length == 0) {
    +                LOGGER.warn("DataLoading finished. No data was loaded.")
    +                loadStatus = SegmentStatus.SUCCESS
    +              }
    +            } else {
    +              loadStatus = SegmentStatus.LOAD_FAILURE
    +            }
               }
    -        }
     
    -        if (loadStatus != SegmentStatus.LOAD_FAILURE &&
    -            partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    -          loadStatus = partitionStatus
    +          if (loadStatus != SegmentStatus.LOAD_FAILURE &&
    +              partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    +            loadStatus = partitionStatus
    +          }
             }
    +      } else {
    +        LOGGER.audit("Not able to acquire the segment lock for table " +
    --- End diff --
   
    This case ever come?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158510211
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -315,88 +314,100 @@ object CarbonDataRDDFactory {
         val isSortTable = carbonTable.getNumberOfSortColumns > 0
         val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
     
    +    val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
    +      CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
    +
         try {
    -      if (updateModel.isDefined) {
    -        res = loadDataFrameForUpdate(
    -          sqlContext,
    -          dataFrame,
    -          carbonLoadModel,
    -          updateModel,
    -          carbonTable)
    -        res.foreach { resultOfSeg =>
    -          resultOfSeg.foreach { resultOfBlock =>
    -            if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
    -              loadStatus = SegmentStatus.LOAD_FAILURE
    -              if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
    -                updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
    -                updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
    -              } else {
    -                updateModel.get.executorErrors = resultOfBlock._2._2
    +      if (segmentLock.lockWithRetries()) {
    +        if (updateModel.isDefined) {
    +          res = loadDataFrameForUpdate(
    +            sqlContext,
    +            dataFrame,
    +            carbonLoadModel,
    +            updateModel,
    +            carbonTable)
    +          res.foreach { resultOfSeg =>
    +            resultOfSeg.foreach { resultOfBlock =>
    +              if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
    +                loadStatus = SegmentStatus.LOAD_FAILURE
    +                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
    +                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
    +                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
    +                } else {
    +                  updateModel.get.executorErrors = resultOfBlock._2._2
    +                }
    +              } else if (resultOfBlock._2._1.getSegmentStatus ==
    +                         SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    +                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
    +                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
                   }
    -            } else if (resultOfBlock._2._1.getSegmentStatus ==
    -                       SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    -              loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    -              updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
    -              updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
                 }
               }
    -        }
    -      } else {
    -        status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
    -          loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
    -        } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
    -          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
    -            dataFrame, carbonLoadModel, hadoopConf)
    -        } else if (dataFrame.isDefined) {
    -          loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
             } else {
    -          loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
    -        }
    -        CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
    -          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
    -        val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
    -        if (status.nonEmpty) {
    -          status.foreach { eachLoadStatus =>
    -            val state = newStatusMap.get(eachLoadStatus._1)
    -            state match {
    -              case Some(SegmentStatus.LOAD_FAILURE) =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -              case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    -                if eachLoadStatus._2._1.getSegmentStatus ==
    -                   SegmentStatus.SUCCESS =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -              case _ =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -            }
    +          status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
    +            loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
    +          } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
    +            DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
    +              dataFrame, carbonLoadModel, hadoopConf)
    +          } else if (dataFrame.isDefined) {
    +            loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
    +          } else {
    +            loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
               }
    -
    -          newStatusMap.foreach {
    -            case (key, value) =>
    -              if (value == SegmentStatus.LOAD_FAILURE) {
    -                loadStatus = SegmentStatus.LOAD_FAILURE
    -              } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
    -                         loadStatus!= SegmentStatus.LOAD_FAILURE) {
    -                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
    +            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
    +          val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
    +          if (status.nonEmpty) {
    +            status.foreach { eachLoadStatus =>
    +              val state = newStatusMap.get(eachLoadStatus._1)
    +              state match {
    +                case Some(SegmentStatus.LOAD_FAILURE) =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    +                case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +                  if eachLoadStatus._2._1.getSegmentStatus ==
    +                     SegmentStatus.SUCCESS =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    +                case _ =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
                   }
    -          }
    -        } else {
    -          // if no value is there in data load, make load status Success
    -          // and data load flow executes
    -          if (dataFrame.isDefined && updateModel.isEmpty) {
    -            val rdd = dataFrame.get.rdd
    -            if (rdd.partitions == null || rdd.partitions.length == 0) {
    -              LOGGER.warn("DataLoading finished. No data was loaded.")
    -              loadStatus = SegmentStatus.SUCCESS
    +            }
    +
    +            newStatusMap.foreach {
    +              case (key, value) =>
    +                if (value == SegmentStatus.LOAD_FAILURE) {
    +                  loadStatus = SegmentStatus.LOAD_FAILURE
    +                } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
    +                           loadStatus != SegmentStatus.LOAD_FAILURE) {
    +                  loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +                }
                 }
               } else {
    -            loadStatus = SegmentStatus.LOAD_FAILURE
    +            // if no value is there in data load, make load status Success
    +            // and data load flow executes
    +            if (dataFrame.isDefined && updateModel.isEmpty) {
    +              val rdd = dataFrame.get.rdd
    +              if (rdd.partitions == null || rdd.partitions.length == 0) {
    +                LOGGER.warn("DataLoading finished. No data was loaded.")
    +                loadStatus = SegmentStatus.SUCCESS
    +              }
    +            } else {
    +              loadStatus = SegmentStatus.LOAD_FAILURE
    +            }
               }
    -        }
     
    -        if (loadStatus != SegmentStatus.LOAD_FAILURE &&
    -            partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    -          loadStatus = partitionStatus
    +          if (loadStatus != SegmentStatus.LOAD_FAILURE &&
    +              partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    +            loadStatus = partitionStatus
    +          }
             }
    +      } else {
    +        LOGGER.audit("Not able to acquire the segment lock for table " +
    --- End diff --
   
    no


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158519731
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---
    @@ -124,27 +148,50 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
         return false;
       }
     
    +  private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
    +      String metadataPath) {
    +    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
    +    for (LoadMetadataDetails oneLoad : currentDetails) {
    +      if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
    +        return oneLoad;
    +      }
    +    }
    +    return null;
    +  }
    +
       public static boolean deleteLoadFoldersFromFileSystem(
           AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
    -      LoadMetadataDetails[] details) {
    +      LoadMetadataDetails[] details, String metadataPath) {
         boolean isDeleted = false;
     
         if (details != null && details.length != 0) {
           for (LoadMetadataDetails oneLoad : details) {
             if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
    -          String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
    -          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
    -          if (deletionStatus) {
    -            isDeleted = true;
    -            oneLoad.setVisibility("false");
    -            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
    +          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
    +              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
    +          try {
    +            if (segmentLock.lockWithRetries()) {
    --- End diff --
   
    It will add up time for each time dataload happens while trying to take lock if parallel load happens. Better add another method `lockWithRetries` which should take very less time while acquiring lock


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158520072
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---
    @@ -124,27 +148,50 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
         return false;
       }
     
    +  private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
    +      String metadataPath) {
    +    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
    +    for (LoadMetadataDetails oneLoad : currentDetails) {
    +      if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
    +        return oneLoad;
    +      }
    +    }
    +    return null;
    +  }
    +
       public static boolean deleteLoadFoldersFromFileSystem(
           AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
    -      LoadMetadataDetails[] details) {
    +      LoadMetadataDetails[] details, String metadataPath) {
         boolean isDeleted = false;
     
         if (details != null && details.length != 0) {
           for (LoadMetadataDetails oneLoad : details) {
             if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
    -          String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
    -          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
    -          if (deletionStatus) {
    -            isDeleted = true;
    -            oneLoad.setVisibility("false");
    -            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
    +          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
    +              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
    +          try {
    +            if (segmentLock.lockWithRetries()) {
    --- End diff --
   
    Better acquire locks for in-progress status segments not for others


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1702: [CARBONDATA-1896] Clean files operation improvement

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1702
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1050/



---
12345