manishnalla1994 commented on a change in pull request #3392: [WIP] Supporting mixed formats in carbon
URL: https://github.com/apache/carbondata/pull/3392#discussion_r329479063 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala ########## @@ -0,0 +1,200 @@ +package org.apache.spark.sql.execution.strategy + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.{SparkSession, execution} +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.hive.orc.OrcFileFormat +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.SparkSQLUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.readcommitter.ReadCommittedScope +import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormat} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object MixedFormatHandler { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + val supportedFormats: Seq[String] = + Seq("carbon", "carbondata", "parquet", "orc", "json", "csv", "text") + + def validateFormat(format: String): Boolean = { + supportedFormats.exists(_.equalsIgnoreCase(format)) + } + + def getSchema(sparkSession: SparkSession, + options: Map[String, String], + segPath: String): StructType = { + val format = options.getOrElse("format", "carbondata") + if ((format.equals("carbondata") || format.equals("carbon"))) { + new SparkCarbonFileFormat().inferSchema(sparkSession, options, Seq.empty).get + } else { + val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\", "/")) + val path = new Path(filePath) + val fs = path.getFileSystem(SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) + val status = fs.listStatus(path, new PathFilter { + override def accept(path: Path): Boolean = + !path.getName.equals("_SUCCESS") && !path.getName.endsWith(".crc") + }) + getFileFormat(new CarbonFileFormat(format)).inferSchema(sparkSession, options, status).get + } + } + + def extraRDD(l: LogicalRelation, + projects: Seq[NamedExpression], + filters: Seq[Expression], + readCommittedScope: ReadCommittedScope): Option[RDD[InternalRow]] = { + val loadMetadataDetails = readCommittedScope.getSegmentList + val rdds = loadMetadataDetails.filterNot(l => + l.getFileFormat.equals(CarbonFileFormat.COLUMNAR_V3) || + l.getFileFormat.equals(CarbonFileFormat.ROW_V1)) Review comment: ```suggestion (l.getFileFormat.equals(CarbonFileFormat.COLUMNAR_V3) || l.getFileFormat.equals(CarbonFileFormat.ROW_V1)) && (!(l.getSegmentStatus.equals(SegmentStatus.SUCCESS) && l.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS)))) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |