Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196314165 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala --- @@ -0,0 +1,124 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.net.URI + +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} +import org.apache.spark.sql.sources.BaseRelation + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + --- End diff -- remove empty line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196314387 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) { do { try { LOG.info(s"starting registry-service on $hostAddress:$port") - val config = RpcEnvConfig( + val config = RpcUtil.getRpcEnvConfig( --- End diff -- RPC is refactored in #2372, this should be rebased after #2372 is merged --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196341741 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -127,7 +127,7 @@ class CarbonAppendableStreamSink( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = fileLogPath, - isAppend = false) + false) --- End diff -- in spark2.2.1 default argument name is "isAppend" and in 2.3.0 it is dynamicPartitionOverwrite. So it is required change --- |
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196650804 --- Diff: integration/spark-common/pom.xml --- @@ -65,6 +65,11 @@ <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> --- End diff -- Not intentional change i guess :) --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196654884 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -65,7 +66,7 @@ object CarbonReflectionUtils { className, tableIdentifier, tableAlias)._1.asInstanceOf[UnresolvedRelation] - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SPARK_VERSION.startsWith("2.2") || SPARK_VERSION.startsWith("2.3")) { --- End diff -- Fixed,added the Utility method for spark version comparison in SparkUtil.scala --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196654906 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -140,6 +142,13 @@ object CarbonReflectionUtils { relation, expectedOutputAttributes, catalogTable)._1.asInstanceOf[LogicalRelation] + } else if (SPARK_VERSION.startsWith("2.3")) { --- End diff -- Fixed,added the Utility method for spark version comparison in SparkUtil.scala --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196654926 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -355,18 +362,19 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } private def getDataSourceScan(relation: LogicalRelation, - output: Seq[Attribute], - partitions: Seq[PartitionSpec], - scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], - ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow], - candidatePredicates: Seq[Expression], - pushedFilters: Seq[Filter], - metadata: Map[String, String], - needDecoder: ArrayBuffer[AttributeReference], - updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = { + output: Seq[Attribute], --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196654954 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala --- @@ -149,8 +149,9 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { } test("test sum*10 aggregation on big decimal column with high precision") { - checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"), - sql("select sum(salary)*10 from hiveBigDecimal")) + val carbonSeq = sql("select sum(salary)*10 from carbonBigDecimal_2").collect --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655020 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala --- @@ -87,7 +87,7 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS ") } catch { case e: Exception => - assert(e.getMessage.contains("no viable alternative at input")) + assert(true) --- End diff -- Fixed,added or condition with message as per spark 2.3.0 --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655128 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) { namedExpressionList += - Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId, + CarbonCompilerUtil.createAliasRef(expressions.head, + name + "_ sum", + NamedExpression.newExprId, alias.qualifier, Some(alias.metadata), - alias.isGenerated) + Some(alias)) validExpressionsMap += AggExpToColumnMappingModel(sumExp) } // check with same expression already count is present then do not add to // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) { namedExpressionList += - Alias(expressions.last, name + "_ count")(NamedExpression.newExprId, - alias.qualifier, - Some(alias.metadata), - alias.isGenerated) + CarbonCompilerUtil.createAliasRef(expressions.last, name + "_ count", --- End diff -- Fixed,Changed the name from CarbonCompilerUtil to CarbonToSparkAdapater --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655176 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -247,6 +252,32 @@ object CarbonReflectionUtils { isFormatted } + --- End diff -- Fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655227 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -247,6 +252,32 @@ object CarbonReflectionUtils { isFormatted } + + def getRowDataSourceScanExecObj(relation: LogicalRelation, --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655245 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.util.CarbonProperties -/** - * Carbon strategies for ddl commands - */ + /** Carbon strategies for ddl commands --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655288 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionState.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser + +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonHiveSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog ( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) with CarbonSessionCatalog { + + private lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + /** + * return's the carbonEnv instance + * @return + */ + override def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + val isRelationRefreshed = + CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession) + if (isRelationRefreshed) { + super.lookupRelation(name) + } else { + rtnRelation + } + } + + /** + * returns hive client from HiveExternalCatalog + * + * @return + */ + override def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + } + + def alterTableRename(oldTableIdentifier: TableIdentifier, + newTableIdentifier: TableIdentifier, + newTablePath: String): Unit = { + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " + + s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " + + s"SET SERDEPROPERTIES" + + s"('tableName'='${ newTableIdentifier.table }', " + + s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") + } + + override def alterTable(tableIdentifier: TableIdentifier, --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655419 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala --- @@ -0,0 +1,124 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.net.URI + +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} +import org.apache.spark.sql.sources.BaseRelation + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196655625 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) { do { try { LOG.info(s"starting registry-service on $hostAddress:$port") - val config = RpcEnvConfig( + val config = RpcUtil.getRpcEnvConfig( --- End diff -- After analyzing the #2372 these changes are not required,so reverted --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2366 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5236/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2366 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6402/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2366 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5347/ --- |
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2366#discussion_r196715002 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -71,15 +72,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.spark.memory.MemoryMode; +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; --- End diff -- Please find the task jira id https://issues.apache.org/jira/browse/CARBONDATA-2619 --- |
Free forum by Nabble | Edit this page |