[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

classic Classic list List threaded Threaded
84 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157647527
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand(
           case c: CatalogRelation => c
         }.head.asInstanceOf[LogicalPlan]
     
    +
         val query: LogicalPlan = if (dataFrame.isDefined) {
    -      val timeStampformatString = CarbonProperties.getInstance()
    -        .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
    -          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
           val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    -      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
    -        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
           val dateFormat = new SimpleDateFormat(dateFormatString)
    --- End diff --
   
    move `timeStampformatString`, `timeStampFormat`, `dateFormatString`, `dateFormat` before line 451, they used in both if and else branch


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157649340
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand(
           case c: CatalogRelation => c
         }.head.asInstanceOf[LogicalPlan]
     
    +
    --- End diff --
   
    comment for line 437 (function signature):
    1. suggest to rename `loadDataWithPartition`
    2. from signature, user can not easily tell why it is for partition, there is no parameter is partition related. It is similar to `CarbonDataRDDFactory.loadCarbonData` only, can we make it more readable.
    3. there are many repeated code in if block and else block, can we extract common part to anther function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157649480
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---
    @@ -98,6 +97,8 @@ with Serializable {
           model,
           conf
         )
    +    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    --- End diff --
   
    why not use value from CarbonProperties?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157649569
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -130,6 +130,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           table.carbonTable.getTableInfo.serialize())
       }
     
    +  protected def pruneFilterProject(
    --- End diff --
   
    add comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157652020
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -313,8 +330,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     
       def getDataSourceScan(relation: LogicalRelation,
    --- End diff --
   
    can be private


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157652034
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -313,8 +330,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     
       def getDataSourceScan(relation: LogicalRelation,
    --- End diff --
   
    can be private


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157652168
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---
    @@ -395,4 +391,14 @@ object CarbonFilters {
           case _ => expressions
         }
       }
    +
    +  def getPartitions(partitionFilters: Seq[Expression],
    +      sparkSession: SparkSession,
    +      identifier: TableIdentifier): Seq[String] = {
    +    val partitions =
    +      sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)
    +    partitions.toList.flatMap { f =>
    --- End diff --
   
    can you use case(xxx, xxx) to replace `f`, make it more readable


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157652296
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---
    @@ -395,4 +391,14 @@ object CarbonFilters {
           case _ => expressions
         }
       }
    +
    +  def getPartitions(partitionFilters: Seq[Expression],
    --- End diff --
   
    I think it is better to do this logic in `pruneFilterProject` directly, this function is not needed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157653185
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---
    @@ -152,6 +154,22 @@ object CarbonScalaUtil {
         }
       }
     
    +  def getString(value: String,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157653275
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand(
           LogicalRDD(attributes, rdd)(sparkSession)
     
         } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
           // input data from csv files. Convert to logical plan
           CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
           hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
           val jobConf = new JobConf(hadoopConf)
           SparkHadoopUtil.get.addCredentials(jobConf)
    -      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
    -        sparkSession.sparkContext,
    -        classOf[CSVInputFormat],
    -        classOf[NullWritable],
    -        classOf[StringArrayWritable],
    -        jobConf
    -      ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString)))
    -
           val attributes =
             StructType(carbonLoadModel.getCsvHeaderColumns.map(
               StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157653428
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand(
           LogicalRDD(attributes, rdd)(sparkSession)
     
         } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
           // input data from csv files. Convert to logical plan
           CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
           hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
           val jobConf = new JobConf(hadoopConf)
           SparkHadoopUtil.get.addCredentials(jobConf)
    -      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
    -        sparkSession.sparkContext,
    -        classOf[CSVInputFormat],
    -        classOf[NullWritable],
    -        classOf[StringArrayWritable],
    -        jobConf
    -      ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString)))
    -
           val attributes =
             StructType(carbonLoadModel.getCsvHeaderColumns.map(
               StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157653659
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand(
           case c: CatalogRelation => c
         }.head.asInstanceOf[LogicalPlan]
     
    +
         val query: LogicalPlan = if (dataFrame.isDefined) {
    -      val timeStampformatString = CarbonProperties.getInstance()
    -        .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
    -          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
           val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    -      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
    -        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
           val dateFormat = new SimpleDateFormat(dateFormatString)
    --- End diff --
   
    No, the logic is different. Both use different formats


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157655675
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand(
           case c: CatalogRelation => c
         }.head.asInstanceOf[LogicalPlan]
     
    +
    --- End diff --
   
    1.Ok, I have changed the method name.
    2.It is just used InsertIntoCommand to let spark handles the partitioning so it does not require to know anything about partitioning information.
    3. I don't see any common code to extract it, both if else has different logic.
    4. Added comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157656319
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---
    @@ -98,6 +97,8 @@ with Serializable {
           model,
           conf
         )
    +    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    --- End diff --
   
    It is fixed hive format we need to use. We already converted to this format in CarbonLoadDataCommand. Otherwise, spark cannot understand the data if we don't convert to this format


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157656893
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -130,6 +130,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           table.carbonTable.getTableInfo.serialize())
       }
     
    +  protected def pruneFilterProject(
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157656940
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -313,8 +330,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     
       def getDataSourceScan(relation: LogicalRelation,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157657244
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---
    @@ -395,4 +391,14 @@ object CarbonFilters {
           case _ => expressions
         }
       }
    +
    +  def getPartitions(partitionFilters: Seq[Expression],
    +      sparkSession: SparkSession,
    +      identifier: TableIdentifier): Seq[String] = {
    +    val partitions =
    +      sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)
    +    partitions.toList.flatMap { f =>
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157657430
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---
    @@ -395,4 +391,14 @@ object CarbonFilters {
           case _ => expressions
         }
       }
    +
    +  def getPartitions(partitionFilters: Seq[Expression],
    --- End diff --
   
    This function will be further used in other places for other features like compaction.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2114/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1672: [CARBONDATA-1858][PARTITION] Support querying data f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1672
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/889/



---
12345