[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

classic Classic list List threaded Threaded
229 messages Options
1 ... 45678910 ... 12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062720
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---
    @@ -184,10 +126,86 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         }
       }
     
    -  private def needToConvertToLowerCase(key: String): Boolean = {
    -    val noConvertList = Array("LIST_INFO", "RANGE_INFO")
    -    !noConvertList.exists(x => x.equalsIgnoreCase(key));
    +  def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
    +  = {
    +    Option(ctx).map(visitPropertyKeyValues)
    +      .getOrElse(Map.empty)
       }
     
    +  def createCarbontable(tableHeader: CreateTableHeaderContext,
    +      skewSpecContext: SkewSpecContext,
    +      bucketSpecContext: BucketSpecContext,
    +      partitionColumns: ColTypeListContext,
    +      columns : ColTypeListContext,
    +      tablePropertyList : TablePropertyListContext) : LogicalPlan = {
    +    // val parser = new CarbonSpark2SqlParser
    +
    +    val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
    +    // TODO: implement temporary tables
    +    if (temp) {
    +      throw new ParseException(
    +        "CREATE TEMPORARY TABLE is not supported yet. " +
    +        "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
    +    }
    +    if (skewSpecContext != null) {
    +      operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
    +    }
    +    if (bucketSpecContext != null) {
    +      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
    +    }
    +    val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList)
    +    val partitionerFields = partitionByStructFields.map { structField =>
    +      PartitionerField(structField.name, Some(structField.dataType.toString), null)
    +    }
    +    val cols = Option(columns).toSeq.flatMap(visitColTypeList)
    +    val properties = getPropertyKeyValues(tablePropertyList)
    +
    +    // Ensuring whether no duplicate name is used in table definition
    +    val colNames = cols.map(_.name)
    +    if (colNames.length != colNames.distinct.length) {
    +      val duplicateColumns = colNames.groupBy(identity).collect {
    +        case (x, ys) if ys.length > 1 => "\"" + x + "\""
    +      }
    +      operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
    --- End diff --
   
    Indentation is wrong


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062729
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---
    @@ -21,10 +21,11 @@ import scala.collection.JavaConverters._
     import scala.collection.mutable.ListBuffer
     
     import org.apache.spark.SparkConf
    -import org.apache.spark.sql.{CarbonEnv, SparkSession}
    +import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
     import org.apache.spark.sql.catalyst.TableIdentifier
    -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
    +import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
     import org.apache.spark.sql.hive.HiveExternalCatalog._
    +import org.apache.spark.sql.internal.SessionState
    --- End diff --
   
    Remove unused import


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062759
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(
    +      _.name.toString.equals(name)).map(
    +      l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]
    --- End diff --
   
    Don't type cast. and don't duplicate method. Keep the method in common-pacakge


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062777
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(
    +      _.name.toString.equals(name)).map(
    +      l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan]
    +    ).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    --- End diff --
   
    remove this method


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062806
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    --- End diff --
   
    Separete as 2 classes , one is commonReflection and other is reflection util. Move as much code as possible to common reflection util.
    And try to make the code more generic.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062831
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -24,16 +24,18 @@ import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTemp
     import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
     import org.apache.spark.sql.catalyst.optimizer.Optimizer
     import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
     import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
    -import org.apache.spark.sql.execution.SparkOptimizer
    +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
     import org.apache.spark.sql.execution.datasources._
     import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
     import org.apache.spark.sql.internal.SQLConf
     import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
    -import org.apache.spark.sql.parser.CarbonSparkSqlParser
    +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
     
     import org.apache.carbondata.core.datamap.DataMapStoreManager
     import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
    +import org.apache.carbondata.core.util.CarbonProperties
    --- End diff --
   
    Remove unused import


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062848
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -84,8 +86,8 @@ class CarbonSessionCatalog(
         var toRefreshRelation = false
         rtnRelation match {
           case SubqueryAlias(_,
    -          LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    -          _) =>
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    +      _) =>
    --- End diff --
   
    Move this line up


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153062859
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -107,15 +110,14 @@ class CarbonSessionCatalog(
         carbonEnv.carbonMetastore.
           checkSchemasModifiedTimeAndReloadTables(storePath)
     
    -    val tableMeta = carbonEnv.carbonMetastore
    -      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
    -        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
    -    if (tableMeta.isEmpty || (tableMeta.isDefined &&
    -        tableMeta.get.carbonTable.getTableLastUpdatedTime !=
    -          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
    +    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
    +      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
    +      carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
    +    if (table.isEmpty || (table.isDefined &&
    +                          table.get.carbonTable.getTableLastUpdatedTime !=
    --- End diff --
   
    wrong indentation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064184
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -133,7 +134,9 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
       override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
     
       experimentalMethods.extraStrategies =
    -    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
    +    Seq(new CarbonLateDecodeStrategy,
    +      new DDLStrategy(sparkSession)
    +    )
    --- End diff --
   
    Keep the format as old like
     `Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064193
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -150,9 +153,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
             DataSourceAnalysis(conf) ::
             (if (conf.runSQLonFile) {
               new ResolveDataSource(sparkSession) :: Nil
    -        } else {
    -          Nil
    -        })
    +        } else {  Nil }
    +          )
    --- End diff --
   
    Keep format as old


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064203
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -207,3 +213,26 @@ class CarbonOptimizer(
         super.execute(transFormedPlan)
       }
     }
    +
    +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
    +  SparkSqlAstBuilder(conf) {
    +
    +  val helper = new CarbonHelperqlAstBuilder(conf, parser)
    +
    +  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
    +    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
    +
    +    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
    +        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
    +      helper
    +        .createCarbontable(ctx.createTableHeader,
    --- End diff --
   
    move line up


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064235
 
    --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog._
    +import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateHiveTableContext, CreateTableContext}
    +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
    +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
    +import org.apache.spark.sql.hive.client.HiveClient
    +import org.apache.spark.sql.internal.{SQLConf, SessionState}
    +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
    +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
    +
    +import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache if the carbontable in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonSessionCatalog(
    +    externalCatalog: HiveExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends HiveSessionCatalog(
    +    externalCatalog,
    +    globalTempViewManager,
    +    new HiveMetastoreCatalog(sparkSession),
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) {
    +
    +  lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +
    +  def getCarbonEnv() : CarbonEnv = {
    +    carbonEnv
    +  }
    +
    +
    +  private def refreshRelationFromCache(identifier: TableIdentifier,
    +      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
    +    var isRefreshed = false
    +    val storePath = CarbonEnv.getInstance(sparkSession).storePath
    +    carbonEnv.carbonMetastore.
    +      checkSchemasModifiedTimeAndReloadTables(storePath)
    +
    +    val tableMeta = carbonEnv.carbonMetastore
    +      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
    +        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
    +    if (tableMeta.isEmpty || (tableMeta.isDefined &&
    +                              tableMeta.get.carbonTable.getTableLastUpdatedTime !=
    +                              carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
    +      refreshTable(identifier)
    +      DataMapStoreManager.getInstance().
    +        clearDataMap(AbsoluteTableIdentifier.from(storePath,
    +          identifier.database.getOrElse("default"), identifier.table))
    +      isRefreshed = true
    +      logInfo(s"Schema changes have been detected for table: $identifier")
    +    }
    +    isRefreshed
    +  }
    +
    +
    +  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
    +    val rtnRelation = super.lookupRelation(name)
    +    var toRefreshRelation = false
    +    rtnRelation match {
    +      case SubqueryAlias(_,
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
    +        toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
    +      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
    +        toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
    +      case _ =>
    +    }
    +
    +    if (toRefreshRelation) {
    +      super.lookupRelation(name)
    +    } else {
    +      rtnRelation
    +    }
    +  }
    +}
    +
    +/**
    + * Session state implementation to override sql parser and adding strategies
    + *
    + * @param sparkSession
    + */
    +class CarbonSessionStateBuilder(sparkSession: SparkSession,
    +    parentState: Option[SessionState] = None)
    +  extends HiveSessionStateBuilder(sparkSession, parentState) {
    +
    +  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
    +
    +  experimentalMethods.extraStrategies =
    +    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
    +  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
    +
    +  /**
    +   * Internal catalog for managing table and database states.
    +   */
    +  /**
    +   * Create a [[CarbonSessionCatalogBuild]].
    +   */
    +  override protected lazy val catalog: CarbonSessionCatalog = {
    +    val catalog = new CarbonSessionCatalog(
    +      externalCatalog,
    +      session.sharedState.globalTempViewManager,
    +      functionRegistry,
    +      sparkSession,
    +      conf,
    +      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
    +      sqlParser,
    +      resourceLoader)
    +    parentState.foreach(_.catalog.copyStateTo(catalog))
    +    catalog
    +  }
    +
    +  private def externalCatalog: HiveExternalCatalog =
    +    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
    +
    +  /**
    +   * Create a Hive aware resource loader.
    +   */
    +  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
    +    val client: HiveClient = externalCatalog.client.newSession()
    +    new HiveSessionResourceLoader(session, client)
    +  }
    +
    +  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
    +
    +  override protected def analyzer: Analyzer = {
    +    new Analyzer(catalog, conf) {
    +
    +      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
    +        new ResolveHiveSerdeTable(session) +:
    +        new FindDataSourceTable(session) +:
    +        new ResolveSQLOnFile(session) +:
    +        new CarbonIUDAnalysisRule(sparkSession) +:
    +        CarbonPreInsertionCasts +: customResolutionRules
    +
    +      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
    +      PreWriteCheck :: HiveOnlyCheck :: Nil
    +
    +      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
    +        new DetermineTableStats(session) +:
    +        RelationConversions(conf, catalog) +:
    +        PreprocessTableCreation(session) +:
    +        PreprocessTableInsertion(conf) +:
    +        DataSourceAnalysis(conf) +:
    +        HiveAnalysis +:
    +        customPostHocResolutionRules
    +    }
    +  }
    +
    +  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
    +
    +}
    +
    +
    +class CarbonOptimizer(
    +    catalog: SessionCatalog,
    +    conf: SQLConf,
    +    experimentalMethods: ExperimentalMethods)
    +  extends SparkOptimizer(catalog, conf, experimentalMethods) {
    +
    +  override def execute(plan: LogicalPlan): LogicalPlan = {
    +    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
    +    // optimize whole plan at once.
    +    val transFormedPlan = plan.transform {
    +      case filter: Filter =>
    +        filter.transformExpressions {
    +          case s: ScalarSubquery =>
    +            val tPlan = s.plan.transform {
    +              case lr: LogicalRelation
    +                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
    +                lr
    +            }
    +            ScalarSubquery(tPlan, s.children, s.exprId)
    +        }
    +    }
    +    super.execute(transFormedPlan)
    +  }
    +}
    +
    +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
    +  SparkSqlAstBuilder(conf) {
    +
    +  val helper = new CarbonHelperqlAstBuilder(conf, parser)
    +
    +  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
    +    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
    +
    +    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
    +        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
    +      helper
    +        .createCarbontable(ctx.createTableHeader,
    --- End diff --
   
    move line up


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064253
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala ---
    @@ -85,71 +85,75 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
           "('DICTIONARY_EXCLUDE'='nodict', 'DEFAULT.VALUE.NoDict'= 'abcd')")
         checkAnswer(sql("select distinct(nodict) from restructure"), Row("abcd"))
       }
    -  test("test add timestamp no dictionary column") {
    -    sql(
    -      "alter table restructure add columns(tmpstmp timestamp) TBLPROPERTIES ('DEFAULT.VALUE" +
    -      ".tmpstmp'= '17-01-2007')")
    -    checkAnswer(sql("select distinct(tmpstmp) from restructure"),
    -      Row(new java.sql.Timestamp(107, 0, 17, 0, 0, 0, 0)))
    -    checkExistence(sql("desc restructure"), true, "tmpstmptimestamp")
    -  }
    -
    -  test("test add timestamp direct dictionary column") {
    -    sql(
    -      "alter table restructure add columns(tmpstmp1 timestamp) TBLPROPERTIES ('DEFAULT.VALUE" +
    -      ".tmpstmp1'= '17-01-3007','DICTIONARY_INCLUDE'='tmpstmp1')")
    -    checkAnswer(sql("select distinct(tmpstmp1) from restructure"),
    -      Row(null))
    -    checkExistence(sql("desc restructure"), true, "tmpstmptimestamp")
    -  }
    -
    -  test("test add timestamp column and load as dictionary") {
    -    sql("create table table1(name string) stored by 'carbondata'")
    -    sql("insert into table1 select 'abc'")
    -    sql("alter table table1 add columns(tmpstmp timestamp) TBLPROPERTIES " +
    -        "('DEFAULT.VALUE.tmpstmp'='17-01-3007','DICTIONARY_INCLUDE'= 'tmpstmp')")
    -    sql("insert into table1 select 'name','17-01-2007'")
    -    checkAnswer(sql("select * from table1"),
    -      Seq(Row("abc",null),
    -        Row("name",Timestamp.valueOf("2007-01-17 00:00:00.0"))))
    -  }
    -
    -  test("test add msr column") {
    -    sql(
    -      "alter table restructure add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" +
    -      ".msrfield'= '123.45')")
    -    checkExistence(sql("desc restructure"), true, "msrfielddecimal(5,2)")
    -    val output = sql("select msrField from restructure").collect
    -    checkAnswer(sql("select distinct(msrField) from restructure"),
    -      Row(new BigDecimal("123.45").setScale(2, RoundingMode.HALF_UP)))
    -  }
    -
    -  test("test add all datatype supported dictionary column") {
    -    sql(
    -      "alter table restructure add columns(strfld string, datefld date, tptfld timestamp, " +
    -      "shortFld smallInt, " +
    -      "intFld int, longFld bigint, dblFld double,dcml decimal(5,4))TBLPROPERTIES" +
    -      "('DICTIONARY_INCLUDE'='datefld,shortFld,intFld,longFld,dblFld,dcml', 'DEFAULT.VALUE" +
    -      ".dblFld'= '12345')")
    -    checkAnswer(sql("select distinct(dblFld) from restructure"),
    -      Row(java.lang.Double.parseDouble("12345")))
    -    checkExistence(sql("desc restructure"), true, "strfldstring")
    -    checkExistence(sql("desc restructure"), true, "dateflddate")
    -    checkExistence(sql("desc restructure"), true, "tptfldtimestamp")
    -    checkExistence(sql("desc restructure"), true, "shortfldsmallint")
    -    checkExistence(sql("desc restructure"), true, "intfldint")
    -    checkExistence(sql("desc restructure"), true, "longfldbigint")
    -    checkExistence(sql("desc restructure"), true, "dblflddouble")
    -    checkExistence(sql("desc restructure"), true, "dcmldecimal(5,4)")
    -  }
    -
    -  test(
    -    "test add decimal without scale and precision, default precision and scale (10,0) should be " +
    -    "used")
    -  {
    -    sql("alter table restructure add columns(dcmldefault decimal)")
    -    checkExistence(sql("desc restructure"), true, "dcmldefaultdecimal(10,0)")
    -  }
    +//  test("test add timestamp no dictionary column") {
    --- End diff --
   
    Don't comment any tests, just ignore. It should be fixed soon.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064274
 
    --- Diff: pom.xml ---
    @@ -509,6 +501,8 @@
                     <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
                     <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
                     <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
    +                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
    +                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
    --- End diff --
   
    why are these added?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064291
 
    --- Diff: pom.xml ---
    @@ -554,6 +553,8 @@
                     <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
                     <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
                     <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
    +                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
    +                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
    --- End diff --
   
    why are these added?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    Build Failed with Spark 2.2.0, Please check CI
    http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/377/


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1469
 
    @sounakr Please change the branch to master.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153076314
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.execution.command.DescribeTableCommand
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * This class contains the wrappers of all the case classes which are common
    + * across spark version 2.1 and 2.2 but have change in parameter list.
    + * Below are the overriden unapply methods in order to make it work
    + * across both the version of spark2.1 and spark 2.2
    + */
    +object CarbonExpressions {
    +
    +  /**
    +   * unapply method of Cast class.
    +   */
    +  object MatchCast {
    +    def unapply(expr: Expression): Option[(Attribute, DataType)] = {
    +      if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) {
    --- End diff --
   
    Done.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153076321
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.execution.command.DescribeTableCommand
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * This class contains the wrappers of all the case classes which are common
    + * across spark version 2.1 and 2.2 but have change in parameter list.
    + * Below are the overriden unapply methods in order to make it work
    + * across both the version of spark2.1 and spark 2.2
    + */
    +object CarbonExpressions {
    +
    +  /**
    +   * unapply method of Cast class.
    +   */
    +  object MatchCast {
    +    def unapply(expr: Expression): Option[(Attribute, DataType)] = {
    +      if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) {
    +        Some((expr.asInstanceOf[Cast].child.asInstanceOf[Attribute], expr.asInstanceOf[Cast].child
    +          .dataType))
    +      } else {
    +        None
    +      }
    +    }
    +  }
    +
    +  /**
    +   * unapply method of Describe Table format.
    +   */
    +  object CarbonDescribeTable {
    +    def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = {
    +      if (plan.isInstanceOf[DescribeTableCommand]) {
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153076325
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.execution.command.DescribeTableCommand
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * This class contains the wrappers of all the case classes which are common
    + * across spark version 2.1 and 2.2 but have change in parameter list.
    + * Below are the overriden unapply methods in order to make it work
    + * across both the version of spark2.1 and spark 2.2
    + */
    +object CarbonExpressions {
    +
    +  /**
    +   * unapply method of Cast class.
    +   */
    +  object MatchCast {
    +    def unapply(expr: Expression): Option[(Attribute, DataType)] = {
    +      if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) {
    +        Some((expr.asInstanceOf[Cast].child.asInstanceOf[Attribute], expr.asInstanceOf[Cast].child
    +          .dataType))
    +      } else {
    +        None
    +      }
    +    }
    +  }
    +
    +  /**
    +   * unapply method of Describe Table format.
    +   */
    +  object CarbonDescribeTable {
    +    def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = {
    +      if (plan.isInstanceOf[DescribeTableCommand]) {
    +        val describeTableCommand = plan.asInstanceOf[DescribeTableCommand]
    +        if (describeTableCommand.table.isInstanceOf[TableIdentifier] &&
    +            describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec] &&
    +            describeTableCommand.isExtended.isInstanceOf[Boolean]) {
    +          Some(describeTableCommand.table,
    +            describeTableCommand.partitionSpec,
    +            describeTableCommand.isExtended)
    +        } else {
    +          None
    +        }
    +      } else {
    +        None
    +      }
    +    }
    +  }
    +
    +  /**
    +   * unapply method of SubqueryAlias.
    +   */
    +  object CarbonSubqueryAlias {
    +    def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = {
    +      if (plan.isInstanceOf[SubqueryAlias]) {
    --- End diff --
   
    Done


---
1 ... 45678910 ... 12