[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

classic Classic list List threaded Threaded
84 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
GitHub user ravipesala opened a pull request:

    https://github.com/apache/carbondata/pull/1672

    [CARBONDATA-1858][PARTITION] Support querying data from partition table.

    This PR depends on https://github.com/apache/carbondata/pull/1642 and https://github.com/apache/carbondata/pull/1654
    In case of partition table first, use sessioncatalog to prune the partitions. With the partition information, datamap should read partition.map file to get the index file and corresponding blocklets to prune
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata partition-read

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1672.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1672
   
----
commit 705c15e739e9270f3cd3899b1e2349679532164c
Author: ravipesala <[hidden email]>
Date:   2017-12-04T10:37:03Z

    Added outputformat for carbon

commit 9c56f892aab33d2d016e41b05f32df415b349553
Author: ravipesala <[hidden email]>
Date:   2017-12-12T06:12:45Z

    Added fileformat in carbon

commit 21c95514f7f005c32a48c0cc884cefaaa0b3f4a6
Author: ravipesala <[hidden email]>
Date:   2017-12-15T19:18:19Z

    Added support to query using standard partitions

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2027/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/800/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2341/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/814/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2045/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339584
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -1394,20 +1394,20 @@ public static String printLine(String a, int num) {
        * Below method will be used to get the list of segment in
        * comma separated string format
        *
    -   * @param segmentList
    +   * @param strings
        * @return comma separated segment string
        */
    -  public static String getSegmentString(List<String> segmentList) {
    -    if (segmentList.isEmpty()) {
    +  public static String convertToString(List<String> strings) {
    --- End diff --
   
    please modify input param to a more readable name and update the comment for this function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2356/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339661
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---
    @@ -40,7 +41,11 @@ case class CarbonInsertIntoCommand(
           scala.collection.immutable.Map("fileheader" -> header),
           overwrite,
           null,
    -      Some(df)).run(sparkSession)
    +      Some(df),
    --- End diff --
   
    please add the parameter name also, it is not easy to check the parameter. change to like:
    ```
       foo(
        paramA = a,
        paramB = b
        ...)
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339670
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -316,16 +332,35 @@ case class CarbonLoadDataCommand(
         } else {
           dataFrame
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      server,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +
    +    if (carbonTable.isStandardPartitionTable) {
    +      try {
    +        loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +      } finally {
    +        server match {
    +          case Some(dictServer) =>
    +            try {
    +              dictServer.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +                .getCarbonTableIdentifier.getTableId)
    +            } catch {
    +              case _: Exception =>
    +                throw new Exception("Dataload failed due to error while writing dictionary file!")
    +            }
    +          case _ =>
    +        }
    +      }
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    --- End diff --
   
    move `sparkSession.sqlContext` to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339682
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    --- End diff --
   
    move parameter to separate line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339685
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    +      convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession),
    +      partition,
    +      query,
    +      OverwriteOptions(isOverwriteTable), false))
    +  }
    +
    +  private def convertToLogicalRelation(
    +      relation: LogicalRelation,
    +      overWrite: Boolean,
    +      loadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): LogicalRelation = {
    +    val catalogTable = relation.catalogTable.get
    +    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val metastoreSchema = StructType(StructType.fromAttributes(
    +      relation.output).fields.map(_.copy(dataType = StringType)))
    +    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
    +    val catalog = new CatalogFileIndex(
    +      sparkSession, catalogTable, relation.relation.sizeInBytes)
    +    if (lazyPruningEnabled) {
    +      catalog
    +    } else {
    +      catalog.filterPartitions(Nil) // materialize all the partitions in memory
    +    }
    +    val partitionSchema =
    +      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
    +      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
    +
    --- End diff --
   
    remove empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339790
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    +      convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession),
    +      partition,
    +      query,
    +      OverwriteOptions(isOverwriteTable), false))
    +  }
    +
    +  private def convertToLogicalRelation(
    +      relation: LogicalRelation,
    +      overWrite: Boolean,
    +      loadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): LogicalRelation = {
    +    val catalogTable = relation.catalogTable.get
    +    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val metastoreSchema = StructType(StructType.fromAttributes(
    +      relation.output).fields.map(_.copy(dataType = StringType)))
    +    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
    +    val catalog = new CatalogFileIndex(
    +      sparkSession, catalogTable, relation.relation.sizeInBytes)
    +    if (lazyPruningEnabled) {
    +      catalog
    +    } else {
    +      catalog.filterPartitions(Nil) // materialize all the partitions in memory
    +    }
    +    val partitionSchema =
    +      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
    +      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
    --- End diff --
   
    can you make it more readable, it is not easy to check the map(_.get)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339822
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.io.File
    +import java.util
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Random
    +
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.types.{DataType, StructType}
    +
    +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
    +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionFileStore}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
    +
    +class CarbonFileFormat
    +  extends FileFormat
    +    with DataSourceRegister
    +    with Logging
    +with Serializable {
    +
    +  override def shortName(): String = "carbondata"
    +
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    None
    +  }
    +
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val conf = job.getConfiguration
    +    conf.setClass(
    +      SQLConf.OUTPUT_COMMITTER_CLASS.key,
    +      classOf[CarbonOutputCommitter],
    +      classOf[CarbonOutputCommitter])
    +    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
    +    sparkSession.sessionState.conf.setConfString(
    +      "spark.sql.sources.commitProtocolClass",
    +      "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
    +    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
    +
    +    var table = CarbonMetadata.getInstance().getCarbonTable(
    +      options.getOrElse("dbName", "default"), options("tableName"))
    +//    table = CarbonTable.buildFromTableInfo(table.getTableInfo, true)
    --- End diff --
   
    remove it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157339835
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.io.File
    +import java.util
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Random
    +
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.types.{DataType, StructType}
    +
    +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
    +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionFileStore}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util}
    +
    +class CarbonFileFormat
    +  extends FileFormat
    +    with DataSourceRegister
    +    with Logging
    +with Serializable {
    +
    +  override def shortName(): String = "carbondata"
    +
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    None
    +  }
    +
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val conf = job.getConfiguration
    +    conf.setClass(
    +      SQLConf.OUTPUT_COMMITTER_CLASS.key,
    +      classOf[CarbonOutputCommitter],
    +      classOf[CarbonOutputCommitter])
    +    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
    +    sparkSession.sessionState.conf.setConfString(
    +      "spark.sql.sources.commitProtocolClass",
    +      "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
    +    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
    +
    +    var table = CarbonMetadata.getInstance().getCarbonTable(
    +      options.getOrElse("dbName", "default"), options("tableName"))
    +//    table = CarbonTable.buildFromTableInfo(table.getTableInfo, true)
    +    val model = new CarbonLoadModel
    +    val carbonProperty = CarbonProperties.getInstance()
    +    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
    +    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
    +    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
    +      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
    +        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
    +          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
    +    val partitionStr =
    +      table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
    +        _.getColumnName.toLowerCase).mkString(",")
    +    optionsFinal.put(
    +      "fileheader",
    +      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
    +    DataLoadingUtil.buildCarbonLoadModel(
    +      table,
    +      carbonProperty,
    +      options,
    +      optionsFinal,
    +      model,
    +      conf
    +    )
    +    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    +    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
    +    model.setPartitionId("0")
    +    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
    +    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
    +    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
    +    CarbonTableOutputFormat.setLoadModel(conf, model)
    +    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
    +
    +    new OutputWriterFactory {
    +
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
    +        var storeLocation: Array[String] = Array[String]()
    +        val isCarbonUseLocalDir = CarbonProperties.getInstance()
    +          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
    +        val tmpLocationSuffix = File.separator + System.nanoTime()
    +        if (isCarbonUseLocalDir) {
    +          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
    +          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
    +            // use single dir
    +            storeLocation = storeLocation :+
    +              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
    +            if (storeLocation == null || storeLocation.isEmpty) {
    +              storeLocation = storeLocation :+
    +                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
    +            }
    +          } else {
    +            // use all the yarn dirs
    +            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
    +          }
    +        } else {
    +          storeLocation =
    +            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
    +        }
    +        CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
    +        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType))
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = {
    +        ".carbondata"
    --- End diff --
   
    There is a constant for it, use it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2057/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala closed the pull request at:

    https://github.com/apache/carbondata/pull/1672


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
GitHub user ravipesala reopened a pull request:

    https://github.com/apache/carbondata/pull/1672

    [CARBONDATA-1858][PARTITION] Support querying data from partition table.

    This PR depends on https://github.com/apache/carbondata/pull/1642 and https://github.com/apache/carbondata/pull/1654
    In case of partition table first, use sessioncatalog to prune the partitions. With the partition information, datamap should read partition.map file to get the index file and corresponding blocklets to prune
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata partition-read

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1672.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1672
   
----
commit 61f54a1e564401febd1803adc82b10719babeb1b
Author: ravipesala <[hidden email]>
Date:   2017-12-04T10:37:03Z

    Added outputformat for carbon

commit ff5393df80bb23a92783761abc20fff4463fc66c
Author: ravipesala <[hidden email]>
Date:   2017-12-12T06:12:45Z

    Added fileformat in carbon

commit aafe97ac86f4818eeab94afb3e4f1a5c6ee74774
Author: ravipesala <[hidden email]>
Date:   2017-12-15T19:18:19Z

    Added support to query using standard partitions

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157346845
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -1394,20 +1394,20 @@ public static String printLine(String a, int num) {
        * Below method will be used to get the list of segment in
        * comma separated string format
        *
    -   * @param segmentList
    +   * @param strings
        * @return comma separated segment string
        */
    -  public static String getSegmentString(List<String> segmentList) {
    -    if (segmentList.isEmpty()) {
    +  public static String convertToString(List<String> strings) {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157346926
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---
    @@ -40,7 +41,11 @@ case class CarbonInsertIntoCommand(
           scala.collection.immutable.Map("fileheader" -> header),
           overwrite,
           null,
    -      Some(df)).run(sparkSession)
    +      Some(df),
    --- End diff --
   
    ok


---
12345