[GitHub] [carbondata] Karan980 opened a new pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Karan980 opened a new pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox

Karan980 opened a new pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022


   ### Why is this PR needed?
    Earlier global sort was not supported during data files merge operation of SI segments. So if some SI is created with global sort and value of carbon.si.segment.merge is true, it merges the data files in SI segments but disorder the globally sorted data.
   
    ### What changes were proposed in this PR?
   Added global sort for data files merge operation in SI segments.
   
    ### Does this PR introduce any user interface change?
    - No
   
    ### Is any new testcase added?
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox

CarbonDataQA2 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-732902364


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3132/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-732973817


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4886/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734625127


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#discussion_r531364008



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -617,4 +637,157 @@ object SecondaryIndexUtil {
     identifiedSegments
   }
 
+  /**
+   * This method deletes the old carbondata files.
+   */
+  private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+              validSegments: util.List[Segment],
+              indexCarbonTable: CarbonTable): Unit = {
+    validSegments.asScala.foreach { segment =>
+      val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+        segment.getSegmentNo)
+      val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }})
+      dataFiles.foreach(dataFile =>
+      if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+        dataFile.delete()
+      })
+    }
+  }
+
+  def mergeSISegmentDataFiles(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+    val validSegments = carbonMergerMapping.validSegments.toList
+    val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+    val proj = indexCarbonTable.getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+    var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+    val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+    val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+    CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+    outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    val segmentMetaDataAccumulator = sparkSession.sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    validSegments.foreach { segment =>

Review comment:
       This can be a spark job, for multiple segments. Handling sequentially is bad




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734646180


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3184/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734646663


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4940/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Karan980 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

Karan980 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734650646


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734682627


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4944/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734683112


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3189/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Karan980 commented on a change in pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

Karan980 commented on a change in pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#discussion_r531439761



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -617,4 +637,157 @@ object SecondaryIndexUtil {
     identifiedSegments
   }
 
+  /**
+   * This method deletes the old carbondata files.
+   */
+  private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+              validSegments: util.List[Segment],
+              indexCarbonTable: CarbonTable): Unit = {
+    validSegments.asScala.foreach { segment =>
+      val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+        segment.getSegmentNo)
+      val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }})
+      dataFiles.foreach(dataFile =>
+      if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+        dataFile.delete()
+      })
+    }
+  }
+
+  def mergeSISegmentDataFiles(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+    val validSegments = carbonMergerMapping.validSegments.toList
+    val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+    val proj = indexCarbonTable.getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+    var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+    val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+    val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+    CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+    outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    val segmentMetaDataAccumulator = sparkSession.sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    validSegments.foreach { segment =>

Review comment:
       It can be handled in a separate PR. i have raised a jira ticket for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#issuecomment-734788102


   LGTM.
   
   Please handle the problem mentioned in another PR soon, else we can't use the si merge for global sort for old tables.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] asfgit closed pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

asfgit closed pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#discussion_r531546338



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -617,4 +637,157 @@ object SecondaryIndexUtil {
     identifiedSegments
   }
 
+  /**
+   * This method deletes the old carbondata files.
+   */
+  private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+              validSegments: util.List[Segment],
+              indexCarbonTable: CarbonTable): Unit = {
+    validSegments.asScala.foreach { segment =>
+      val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+        segment.getSegmentNo)
+      val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }})
+      dataFiles.foreach(dataFile =>
+      if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+        dataFile.delete()
+      })
+    }
+  }
+
+  def mergeSISegmentDataFiles(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+    val validSegments = carbonMergerMapping.validSegments.toList
+    val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+    val proj = indexCarbonTable.getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+    var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+    val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+    val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+    CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+    outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    val segmentMetaDataAccumulator = sparkSession.sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    validSegments.foreach { segment =>

Review comment:
       please reply the JIRA id also here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Karan980 commented on a change in pull request #4022: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

GitBox
In reply to this post by GitBox

Karan980 commented on a change in pull request #4022:
URL: https://github.com/apache/carbondata/pull/4022#discussion_r531636655



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -617,4 +637,157 @@ object SecondaryIndexUtil {
     identifiedSegments
   }
 
+  /**
+   * This method deletes the old carbondata files.
+   */
+  private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+              validSegments: util.List[Segment],
+              indexCarbonTable: CarbonTable): Unit = {
+    validSegments.asScala.foreach { segment =>
+      val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+        segment.getSegmentNo)
+      val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }})
+      dataFiles.foreach(dataFile =>
+      if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+        dataFile.delete()
+      })
+    }
+  }
+
+  def mergeSISegmentDataFiles(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+    val validSegments = carbonMergerMapping.validSegments.toList
+    val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+    val proj = indexCarbonTable.getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+    var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+    val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+    val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+    CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+    outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    val segmentMetaDataAccumulator = sparkSession.sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    validSegments.foreach { segment =>

Review comment:
       4060




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]