[GitHub] [carbondata] jackylk commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table

GitBox
jackylk commented on a change in pull request #3431: [CARBONDATA-3566] Support add segment for partition table
URL: https://github.com/apache/carbondata/pull/3431#discussion_r345164229
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
 ##########
 @@ -56,23 +59,75 @@ object MixedFormatHandler {
     supportedFormats.exists(_.equalsIgnoreCase(format))
   }
 
-  def getSchema(sparkSession: SparkSession,
+  /**
+   * collect schema, list of last level directory and list of all data files under given path
+   *
+   * @param sparkSession spark session
+   * @param options option for ADD SEGMENT
+   * @param inputPath under which path to collect
+   * @return schema of the data file, map of last level directory (partition folder) to its
+   *         children file list (data files)
+   */
+  def collectInfo(
+      sparkSession: SparkSession,
       options: Map[String, String],
-      segPath: String): StructType = {
-    val format = options.getOrElse("format", "carbondata")
-    if ((format.equals("carbondata") || format.equals("carbon"))) {
-      new SparkCarbonFileFormat().inferSchema(sparkSession, options, Seq.empty).get
+      inputPath: String): (StructType, mutable.Map[String, Seq[FileStatus]]) = {
+    val path = new Path(inputPath)
+    val fs = path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+    val rootPath = fs.getFileStatus(path)
+    val leafDirFileMap = collectAllLeafFileStatus(sparkSession, rootPath, fs)
+    val format = options.getOrElse("format", "carbondata").toLowerCase
+    val fileFormat = if ((format.equals("carbondata") || format.equals("carbon"))) {
+      new SparkCarbonFileFormat()
     } else {
-      val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\", "/"))
-      val path = new Path(filePath)
-      val fs = path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
-      val status = fs.listStatus(path, new PathFilter {
-        override def accept(path: Path): Boolean = {
-          !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
-        }
-      })
-      getFileFormat(new FileFormatName(format)).inferSchema(sparkSession, options, status).get
+      getFileFormat(new FileFormatName(format))
+    }
+    if (leafDirFileMap.isEmpty) {
+      throw new RuntimeException("no partition data is found")
+    }
+    val schema = fileFormat.inferSchema(sparkSession, options, leafDirFileMap.head._2).get
+    (schema, leafDirFileMap)
+  }
+
+  /**
+   * collect leaf directories and leaf files recursively in given path
+   *
+   * @param sparkSession spark session
+   * @param path path to collect
+   * @param fs hadoop file system
+   * @return mapping of leaf directory to its children files
+   */
+  private def collectAllLeafFileStatus(
+      sparkSession: SparkSession,
+      path: FileStatus,
+      fs: FileSystem): mutable.Map[String, Seq[FileStatus]] = {
+    val directories: ArrayBuffer[FileStatus] = ArrayBuffer()
+    val leafFiles: ArrayBuffer[FileStatus] = ArrayBuffer()
+    val lastLevelFileMap = mutable.Map[String, Seq[FileStatus]]()
+
+    // get all files under input path
+    val fileStatus = fs.listStatus(path.getPath, new PathFilter {
+      override def accept(path: Path): Boolean = {
+        !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc")
+      }
+    })
+    // collect directories and files
+    fileStatus.foreach { file =>
+      if (file.isDirectory) directories.append(file)
+      else leafFiles.append(file)
+    }
+    if (leafFiles.nonEmpty) {
+      // leaf file is found, so parent folder (input parameter) is the last level dir
+      val updatedPath = FileFactory.getUpdatedFilePath(path.getPath.toString)
+      lastLevelFileMap.put(updatedPath, leafFiles)
+      return lastLevelFileMap
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services