Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157647527 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand( case c: CatalogRelation => c }.head.asInstanceOf[LogicalPlan] + val query: LogicalPlan = if (dataFrame.isDefined) { - val timeStampformatString = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT val timeStampFormat = new SimpleDateFormat(timeStampformatString) - val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants - .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT val dateFormat = new SimpleDateFormat(dateFormatString) --- End diff -- move `timeStampformatString`, `timeStampFormat`, `dateFormatString`, `dateFormat` before line 451, they used in both if and else branch --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157649340 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand( case c: CatalogRelation => c }.head.asInstanceOf[LogicalPlan] + --- End diff -- comment for line 437 (function signature): 1. suggest to rename `loadDataWithPartition` 2. from signature, user can not easily tell why it is for partition, there is no parameter is partition related. It is similar to `CarbonDataRDDFactory.loadCarbonData` only, can we make it more readable. 3. there are many repeated code in if block and else block, can we extract common part to anther function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157649480 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -98,6 +97,8 @@ with Serializable { model, conf ) + model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) --- End diff -- why not use value from CarbonProperties? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157649569 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -130,6 +130,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { table.carbonTable.getTableInfo.serialize()) } + protected def pruneFilterProject( --- End diff -- add comment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157652020 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -313,8 +330,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { def getDataSourceScan(relation: LogicalRelation, --- End diff -- can be private --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157652034 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -313,8 +330,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { def getDataSourceScan(relation: LogicalRelation, --- End diff -- can be private --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157652168 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala --- @@ -395,4 +391,14 @@ object CarbonFilters { case _ => expressions } } + + def getPartitions(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier): Seq[String] = { + val partitions = + sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters) + partitions.toList.flatMap { f => --- End diff -- can you use case(xxx, xxx) to replace `f`, make it more readable --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157652296 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala --- @@ -395,4 +391,14 @@ object CarbonFilters { case _ => expressions } } + + def getPartitions(partitionFilters: Seq[Expression], --- End diff -- I think it is better to do this logic in `pruneFilterProject` directly, this function is not needed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157653185 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala --- @@ -152,6 +154,22 @@ object CarbonScalaUtil { } } + def getString(value: String, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157653275 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand( LogicalRDD(attributes, rdd)(sparkSession) } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) // input data from csv files. Convert to logical plan CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) val jobConf = new JobConf(hadoopConf) SparkHadoopUtil.get.addCredentials(jobConf) - val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf - ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString))) - val attributes = StructType(carbonLoadModel.getCsvHeaderColumns.map( StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157653428 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -479,22 +479,52 @@ case class CarbonLoadDataCommand( LogicalRDD(attributes, rdd)(sparkSession) } else { + var timeStampformatString = carbonLoadModel.getTimestampformat + if (timeStampformatString.isEmpty) { + timeStampformatString = carbonLoadModel.getDefaultTimestampFormat + } + val timeStampFormat = new SimpleDateFormat(timeStampformatString) + var dateFormatString = carbonLoadModel.getDateFormat + if (dateFormatString.isEmpty) { + dateFormatString = carbonLoadModel.getDefaultDateFormat + } + val dateFormat = new SimpleDateFormat(dateFormatString) // input data from csv files. Convert to logical plan CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) val jobConf = new JobConf(hadoopConf) SparkHadoopUtil.get.addCredentials(jobConf) - val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf - ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString))) - val attributes = StructType(carbonLoadModel.getCsvHeaderColumns.map( StructField(_, StringType))).toAttributes + val rowDataTypes = attributes.map{f => + relation.output.find(_.name.equalsIgnoreCase(f.name)) match { + case Some(attr) => attr.dataType + case _ => StringType + } + } + val len = rowDataTypes.length + val rdd = + new NewHadoopRDD[NullWritable, StringArrayWritable]( + sparkSession.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + jobConf).map{f => --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157653659 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand( case c: CatalogRelation => c }.head.asInstanceOf[LogicalPlan] + val query: LogicalPlan = if (dataFrame.isDefined) { - val timeStampformatString = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT val timeStampFormat = new SimpleDateFormat(timeStampformatString) - val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants - .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT val dateFormat = new SimpleDateFormat(dateFormatString) --- End diff -- No, the logic is different. Both use different formats --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157655675 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -445,13 +447,11 @@ case class CarbonLoadDataCommand( case c: CatalogRelation => c }.head.asInstanceOf[LogicalPlan] + --- End diff -- 1.Ok, I have changed the method name. 2.It is just used InsertIntoCommand to let spark handles the partitioning so it does not require to know anything about partitioning information. 3. I don't see any common code to extract it, both if else has different logic. 4. Added comment --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157656319 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala --- @@ -98,6 +97,8 @@ with Serializable { model, conf ) + model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) --- End diff -- It is fixed hive format we need to use. We already converted to this format in CarbonLoadDataCommand. Otherwise, spark cannot understand the data if we don't convert to this format --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157656893 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -130,6 +130,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { table.carbonTable.getTableInfo.serialize()) } + protected def pruneFilterProject( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157656940 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -313,8 +330,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { def getDataSourceScan(relation: LogicalRelation, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157657244 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala --- @@ -395,4 +391,14 @@ object CarbonFilters { case _ => expressions } } + + def getPartitions(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier): Seq[String] = { + val partitions = + sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters) + partitions.toList.flatMap { f => --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157657430 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala --- @@ -395,4 +391,14 @@ object CarbonFilters { case _ => expressions } } + + def getPartitions(partitionFilters: Seq[Expression], --- End diff -- This function will be further used in other places for other features like compaction. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2114/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1672 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/889/ --- |
Free forum by Nabble | Edit this page |