Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152571235 --- Diff: integration/spark2/pom.xml --- @@ -36,7 +36,7 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark-common</artifactId> + <artifactId>carbondata-streaming</artifactId> --- End diff -- why it is required to change --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152571409 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -58,7 +59,7 @@ class CarbonEnv { ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) val config = new CarbonSQLConf(sparkSession) if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) { - config.addDefaultCarbonParams() +// config.addDefaultCarbonParams() --- End diff -- remove if not required --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152571649 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -85,7 +86,7 @@ object CarbonEnv { def getInstance(sparkSession: SparkSession): CarbonEnv = { if (sparkSession.isInstanceOf[CarbonSession]) { - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv --- End diff -- don't change the format --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152575948 --- Diff: assembly/pom.xml --- @@ -126,7 +126,7 @@ </build> <profiles> <profile> - <id>spark-2.1</id> --- End diff -- It should not be removed, need to add another profile for 2.2 --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152577180 --- Diff: integration/spark-common-cluster-test/pom.xml --- @@ -177,6 +177,17 @@ </profile> <profile> <id>spark-2.1</id> + <dependencies> --- End diff -- why this dependency added separately to 2.1 --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152577825 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { --- End diff -- Add comments --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152578348 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast]) { + val castExpr = expr.asInstanceOf[Cast] + if (castExpr.child.isInstanceOf[Attribute]) { + Some((castExpr.child.asInstanceOf[Attribute], castExpr.dataType)) + } else { + None + } + } else { + None + } + + } + } + + object CarbonDescribeTable { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { + val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] + if (describeTableCommand.table.isInstanceOf[TableIdentifier]) { + if (describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec]) { + if (describeTableCommand.isExtended.isInstanceOf[Boolean]) { --- End diff -- Combine all 3 `if` conditions to one `if` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152578680 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast]) { + val castExpr = expr.asInstanceOf[Cast] + if (castExpr.child.isInstanceOf[Attribute]) { + Some((castExpr.child.asInstanceOf[Attribute], castExpr.dataType)) + } else { + None + } + } else { + None + } + + } + } + + object CarbonDescribeTable { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { + val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] + if (describeTableCommand.table.isInstanceOf[TableIdentifier]) { + if (describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec]) { + if (describeTableCommand.isExtended.isInstanceOf[Boolean]) { + Some(describeTableCommand.table, + describeTableCommand.partitionSpec, + describeTableCommand.isExtended) + } else { + None + } + } else { + None + } + } else { + None + } + } else { + None + } + } + } + + object CarbonSubqueryAlias { + def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = { + if (plan.isInstanceOf[SubqueryAlias]) { + val subqueryAlias = plan.asInstanceOf[SubqueryAlias] + if (subqueryAlias.alias.isInstanceOf[String]) { + if (subqueryAlias.child.isInstanceOf[LogicalPlan]) { --- End diff -- combine both `if` conditions --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152579581 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast]) { + val castExpr = expr.asInstanceOf[Cast] + if (castExpr.child.isInstanceOf[Attribute]) { + Some((castExpr.child.asInstanceOf[Attribute], castExpr.dataType)) + } else { + None + } + } else { + None + } + + } + } + + object CarbonDescribeTable { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { + val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] + if (describeTableCommand.table.isInstanceOf[TableIdentifier]) { + if (describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec]) { + if (describeTableCommand.isExtended.isInstanceOf[Boolean]) { + Some(describeTableCommand.table, + describeTableCommand.partitionSpec, + describeTableCommand.isExtended) + } else { + None + } + } else { + None + } + } else { + None + } + } else { + None + } + } + } + + object CarbonSubqueryAlias { + def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = { + if (plan.isInstanceOf[SubqueryAlias]) { + val subqueryAlias = plan.asInstanceOf[SubqueryAlias] + if (subqueryAlias.alias.isInstanceOf[String]) { + if (subqueryAlias.child.isInstanceOf[LogicalPlan]) { + Some(subqueryAlias.alias, + subqueryAlias.child) --- End diff -- Move to above line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152582049 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -19,10 +19,10 @@ package org.apache.spark.sql import java.io.File import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext --- End diff -- Imports order don't change , keep as combined like earlier --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152582199 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + --- End diff -- remove the commented code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152582387 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { + if (sparkContext.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSessionState") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val sessionState1 = ctor.newInstance(this).asInstanceOf[SessionState] + sessionState1 + } else if (sparkContext.version.contains("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSessionStateBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val sessionStateBuilder = ctor.newInstance(this, None) + val method = clazz.getMethod("build") + val sessionState1: SessionState = method.invoke(sessionStateBuilder) + .asInstanceOf[SessionState] --- End diff -- move to above line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152582582 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { --- End diff -- move this reflection code to one common util --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152582968 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala --- @@ -96,6 +99,8 @@ object CastExpressionOptimization { } } + + --- End diff -- remove the empty lines --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152583029 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala --- @@ -21,16 +21,19 @@ import java.text.{ParseException, SimpleDateFormat} import java.util import java.util.{Locale, TimeZone} +import scala.Option import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.CastExpr import org.apache.spark.sql.sources -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties + --- End diff -- remove the empty line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152584958 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala --- @@ -80,7 +80,13 @@ case class CarbonCreateTableCommand( val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - +// +// sparkSession.sql( +// s"""CREATE TABLE $dbName.$tbName +// |(${ fields.map(f => f.rawSchema.replace("`", "")).mkString(",") }) +// |USING org.apache.spark.sql.CarbonSource""".stripMargin + +// s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + +// s""""$tablePath"$carbonSchemaString) """) --- End diff -- remove commented code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 @sounakr Please check the build for 2.2 http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/375/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152721088 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { + if (sparkContext.version.contains("2.1")) { --- End diff -- It'd better use sparkContext.version.**startsWith**("2.1"), if version = 2.2.1, contains("2.1") will return true. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152725691 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { + if (sparkContext.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSessionState") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val sessionState1 = ctor.newInstance(this).asInstanceOf[SessionState] + sessionState1 + } else if (sparkContext.version.contains("2.2")) { --- End diff -- use sparkContext.version.startsWith("2.2") --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r152725990 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -42,6 +43,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil + --- End diff -- remove empty line --- |
Free forum by Nabble | Edit this page |