[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

classic Classic list List threaded Threaded
84 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157346945
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -316,16 +332,35 @@ case class CarbonLoadDataCommand(
         } else {
           dataFrame
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      server,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +
    +    if (carbonTable.isStandardPartitionTable) {
    +      try {
    +        loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +      } finally {
    +        server match {
    +          case Some(dictServer) =>
    +            try {
    +              dictServer.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +                .getCarbonTableIdentifier.getTableId)
    +            } catch {
    +              case _: Exception =>
    +                throw new Exception("Dataload failed due to error while writing dictionary file!")
    +            }
    +          case _ =>
    +        }
    +      }
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157346996
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157346999
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    +      convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession),
    +      partition,
    +      query,
    +      OverwriteOptions(isOverwriteTable), false))
    +  }
    +
    +  private def convertToLogicalRelation(
    +      relation: LogicalRelation,
    +      overWrite: Boolean,
    +      loadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): LogicalRelation = {
    +    val catalogTable = relation.catalogTable.get
    +    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val metastoreSchema = StructType(StructType.fromAttributes(
    +      relation.output).fields.map(_.copy(dataType = StringType)))
    +    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
    +    val catalog = new CatalogFileIndex(
    +      sparkSession, catalogTable, relation.relation.sizeInBytes)
    +    if (lazyPruningEnabled) {
    +      catalog
    +    } else {
    +      catalog.filterPartitions(Nil) // materialize all the partitions in memory
    +    }
    +    val partitionSchema =
    +      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
    +      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
    +
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157347018
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    +      convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession),
    +      partition,
    +      query,
    +      OverwriteOptions(isOverwriteTable), false))
    +  }
    +
    +  private def convertToLogicalRelation(
    +      relation: LogicalRelation,
    +      overWrite: Boolean,
    +      loadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): LogicalRelation = {
    +    val catalogTable = relation.catalogTable.get
    +    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val metastoreSchema = StructType(StructType.fromAttributes(
    +      relation.output).fields.map(_.copy(dataType = StringType)))
    +    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
    +    val catalog = new CatalogFileIndex(
    +      sparkSession, catalogTable, relation.relation.sizeInBytes)
    +    if (lazyPruningEnabled) {
    +      catalog
    +    } else {
    +      catalog.filterPartitions(Nil) // materialize all the partitions in memory
    +    }
    +    val partitionSchema =
    +      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
    +      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/830/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157347031
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.io.File
    +import java.util
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Random
    +
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.types.{DataType, StructType}
    +
    +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
    +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionFileStore}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
    +
    +class CarbonFileFormat
    +  extends FileFormat
    +    with DataSourceRegister
    +    with Logging
    +with Serializable {
    +
    +  override def shortName(): String = "carbondata"
    +
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    None
    +  }
    +
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val conf = job.getConfiguration
    +    conf.setClass(
    +      SQLConf.OUTPUT_COMMITTER_CLASS.key,
    +      classOf[CarbonOutputCommitter],
    +      classOf[CarbonOutputCommitter])
    +    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
    +    sparkSession.sessionState.conf.setConfString(
    +      "spark.sql.sources.commitProtocolClass",
    +      "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
    +    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
    +
    +    var table = CarbonMetadata.getInstance().getCarbonTable(
    +      options.getOrElse("dbName", "default"), options("tableName"))
    +//    table = CarbonTable.buildFromTableInfo(table.getTableInfo, true)
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157347049
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.io.File
    +import java.util
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Random
    +
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.types.{DataType, StructType}
    +
    +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
    +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionFileStore}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
    +
    +class CarbonFileFormat
    +  extends FileFormat
    +    with DataSourceRegister
    +    with Logging
    +with Serializable {
    +
    +  override def shortName(): String = "carbondata"
    +
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    None
    +  }
    +
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val conf = job.getConfiguration
    +    conf.setClass(
    +      SQLConf.OUTPUT_COMMITTER_CLASS.key,
    +      classOf[CarbonOutputCommitter],
    +      classOf[CarbonOutputCommitter])
    +    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
    +    sparkSession.sessionState.conf.setConfString(
    +      "spark.sql.sources.commitProtocolClass",
    +      "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
    +    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
    +
    +    var table = CarbonMetadata.getInstance().getCarbonTable(
    +      options.getOrElse("dbName", "default"), options("tableName"))
    +//    table = CarbonTable.buildFromTableInfo(table.getTableInfo, true)
    +    val model = new CarbonLoadModel
    +    val carbonProperty = CarbonProperties.getInstance()
    +    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
    +    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
    +    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
    +      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
    +        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
    +          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
    +    val partitionStr =
    +      table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
    +        _.getColumnName.toLowerCase).mkString(",")
    +    optionsFinal.put(
    +      "fileheader",
    +      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
    +    DataLoadingUtil.buildCarbonLoadModel(
    +      table,
    +      carbonProperty,
    +      options,
    +      optionsFinal,
    +      model,
    +      conf
    +    )
    +    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    +    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
    +    model.setPartitionId("0")
    +    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
    +    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
    +    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
    +    CarbonTableOutputFormat.setLoadModel(conf, model)
    +    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
    +
    +    new OutputWriterFactory {
    +
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
    +        var storeLocation: Array[String] = Array[String]()
    +        val isCarbonUseLocalDir = CarbonProperties.getInstance()
    +          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
    +        val tmpLocationSuffix = File.separator + System.nanoTime()
    +        if (isCarbonUseLocalDir) {
    +          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
    +          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
    +            // use single dir
    +            storeLocation = storeLocation :+
    +              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
    +            if (storeLocation == null || storeLocation.isEmpty) {
    +              storeLocation = storeLocation :+
    +                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
    +            }
    +          } else {
    +            // use all the yarn dirs
    +            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
    +          }
    +        } else {
    +          storeLocation =
    +            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
    +        }
    +        CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
    +        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType))
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = {
    +        ".carbondata"
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2366/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/834/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/835/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2061/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2396/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/884/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2110/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2399/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/887/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2112/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157646660
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---
    @@ -152,6 +154,22 @@ object CarbonScalaUtil {
         }
       }
     
    +  def getString(value: String,
    --- End diff --
   
    move parameter to next line, and add comment for this function.
    Maybe rename to `convertToUTF8String` is better


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157646748
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand(
           LogicalRDD(attributes, rdd)(sparkSession)
     
         } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
           // input data from csv files. Convert to logical plan
           CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
           hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
           val jobConf = new JobConf(hadoopConf)
           SparkHadoopUtil.get.addCredentials(jobConf)
    -      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
    -        sparkSession.sparkContext,
    -        classOf[CSVInputFormat],
    -        classOf[NullWritable],
    -        classOf[StringArrayWritable],
    -        jobConf
    -      ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString)))
    -
           val attributes =
             StructType(carbonLoadModel.getCsvHeaderColumns.map(
               StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    --- End diff --
   
    change `f` to `attribute`, add space after `map`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157646893
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand(
           LogicalRDD(attributes, rdd)(sparkSession)
     
         } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
           // input data from csv files. Convert to logical plan
           CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
           hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
           val jobConf = new JobConf(hadoopConf)
           SparkHadoopUtil.get.addCredentials(jobConf)
    -      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
    -        sparkSession.sparkContext,
    -        classOf[CSVInputFormat],
    -        classOf[NullWritable],
    -        classOf[StringArrayWritable],
    -        jobConf
    -      ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString)))
    -
           val attributes =
             StructType(carbonLoadModel.getCsvHeaderColumns.map(
               StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    --- End diff --
   
    better to use `case (key, value)` instead of `f`


---
12345