[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

classic Classic list List threaded Threaded
101 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196314165
 
    --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala ---
    @@ -0,0 +1,124 @@
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import java.net.URI
    +
    +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
    +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
    +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
    +import org.apache.spark.sql.sources.BaseRelation
    +
    +/**
    + * Create table 'using carbondata' and insert the query result into it.
    + *
    + * @param table the Catalog Table
    + * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
    + * @param query the query whose result will be insert into the new relation
    + *
    + */
    +
    --- End diff --
   
    remove empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196314387
 
    --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---
    @@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) {
               do {
                 try {
                   LOG.info(s"starting registry-service on $hostAddress:$port")
    -              val config = RpcEnvConfig(
    +              val config = RpcUtil.getRpcEnvConfig(
    --- End diff --
   
    RPC is refactored in #2372, this should be rebased after #2372 is merged


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196341741
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -127,7 +127,7 @@ class CarbonAppendableStreamSink(
             className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
             jobId = batchId.toString,
             outputPath = fileLogPath,
    -        isAppend = false)
    +        false)
    --- End diff --
   
    in spark2.2.1 default argument name is "isAppend" and in 2.3.0 it is dynamicPartitionOverwrite. So it is required change


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196650804
 
    --- Diff: integration/spark-common/pom.xml ---
    @@ -65,6 +65,11 @@
           <artifactId>scalatest_${scala.binary.version}</artifactId>
           <scope>provided</scope>
         </dependency>
    +      <dependency>
    +          <groupId>org.apache.zookeeper</groupId>
    --- End diff --
   
    Not intentional change i guess :)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196654884
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -65,7 +66,7 @@ object CarbonReflectionUtils {
             className,
             tableIdentifier,
             tableAlias)._1.asInstanceOf[UnresolvedRelation]
    -    } else if (SPARK_VERSION.startsWith("2.2")) {
    +    } else if (SPARK_VERSION.startsWith("2.2") || SPARK_VERSION.startsWith("2.3")) {
    --- End diff --
   
    Fixed,added the Utility method for spark version comparison in SparkUtil.scala


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196654906
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -140,6 +142,13 @@ object CarbonReflectionUtils {
             relation,
             expectedOutputAttributes,
             catalogTable)._1.asInstanceOf[LogicalRelation]
    +    } else if (SPARK_VERSION.startsWith("2.3")) {
    --- End diff --
   
    Fixed,added the Utility method for spark version comparison in SparkUtil.scala


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196654926
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -355,18 +362,19 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       }
     
       private def getDataSourceScan(relation: LogicalRelation,
    -      output: Seq[Attribute],
    -      partitions: Seq[PartitionSpec],
    -      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
    -        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
    -      candidatePredicates: Seq[Expression],
    -      pushedFilters: Seq[Filter],
    -      metadata: Map[String, String],
    -      needDecoder: ArrayBuffer[AttributeReference],
    -      updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
    +                                output: Seq[Attribute],
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196654954
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala ---
    @@ -149,8 +149,9 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
       }
     
       test("test sum*10 aggregation on big decimal column with high precision") {
    -    checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"),
    -      sql("select sum(salary)*10 from hiveBigDecimal"))
    +    val carbonSeq = sql("select sum(salary)*10 from carbonBigDecimal_2").collect
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655020
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala ---
    @@ -87,7 +87,7 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach {
           sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS  ")
         } catch {
           case e: Exception =>
    -        assert(e.getMessage.contains("no viable alternative at input"))
    +        assert(true)
    --- End diff --
   
    Fixed,added or condition with message as per spark 2.3.0


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655128
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
                   // named expression list otherwise update the list and add it to set
                   if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
                     namedExpressionList +=
    -                Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
    +                CarbonCompilerUtil.createAliasRef(expressions.head,
    +                  name + "_ sum",
    +                  NamedExpression.newExprId,
                       alias.qualifier,
                       Some(alias.metadata),
    -                  alias.isGenerated)
    +                  Some(alias))
                     validExpressionsMap += AggExpToColumnMappingModel(sumExp)
                   }
                   // check with same expression already count is present then do not add to
                   // named expression list otherwise update the list and add it to set
                   if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
                     namedExpressionList +=
    -                Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
    -                  alias.qualifier,
    -                  Some(alias.metadata),
    -                  alias.isGenerated)
    +                  CarbonCompilerUtil.createAliasRef(expressions.last, name + "_ count",
    --- End diff --
   
    Fixed,Changed the name from CarbonCompilerUtil to CarbonToSparkAdapater


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655176
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -247,6 +252,32 @@ object CarbonReflectionUtils {
         isFormatted
       }
     
    +
    --- End diff --
   
    Fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655227
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -247,6 +252,32 @@ object CarbonReflectionUtils {
         isFormatted
       }
     
    +
    +  def getRowDataSourceScanExecObj(relation: LogicalRelation,
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655245
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---
    @@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
     import org.apache.carbondata.core.features.TableOperation
     import org.apache.carbondata.core.util.CarbonProperties
     
    -/**
    - * Carbon strategies for ddl commands
    - */
    +  /** Carbon strategies for ddl commands
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655288
 
    --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionState.scala ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog._
    +import org.apache.spark.sql.catalyst.expressions.Expression
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
    +import org.apache.spark.sql.hive.client.HiveClient
    +import org.apache.spark.sql.internal.{SQLConf, SessionState}
    +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
    +import org.apache.spark.sql.parser.CarbonSparkSqlParser
    +
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache if the carbontable in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonHiveSessionCatalog(
    +    externalCatalog: HiveExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends HiveSessionCatalog (
    +    externalCatalog,
    +    globalTempViewManager,
    +    new HiveMetastoreCatalog(sparkSession),
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) with CarbonSessionCatalog {
    +
    +  private lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +  /**
    +   * return's the carbonEnv instance
    +   * @return
    +   */
    +  override def getCarbonEnv() : CarbonEnv = {
    +    carbonEnv
    +  }
    +
    +  // Initialize all listeners to the Operation bus.
    +  CarbonEnv.initListeners()
    +
    +  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
    +    val rtnRelation = super.lookupRelation(name)
    +    val isRelationRefreshed =
    +      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
    +    if (isRelationRefreshed) {
    +      super.lookupRelation(name)
    +    } else {
    +      rtnRelation
    +    }
    +  }
    +
    +  /**
    +   * returns hive client from HiveExternalCatalog
    +   *
    +   * @return
    +   */
    +  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
    +    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
    +      .asInstanceOf[HiveExternalCatalog].client
    +  }
    +
    +  def alterTableRename(oldTableIdentifier: TableIdentifier,
    +      newTableIdentifier: TableIdentifier,
    +      newTablePath: String): Unit = {
    +    getClient().runSqlHive(
    +      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
    +      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
    +    getClient().runSqlHive(
    +      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
    +      s"SET SERDEPROPERTIES" +
    +      s"('tableName'='${ newTableIdentifier.table }', " +
    +      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
    +  }
    +
    +  override def alterTable(tableIdentifier: TableIdentifier,
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655419
 
    --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala ---
    @@ -0,0 +1,124 @@
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import java.net.URI
    +
    +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
    +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
    +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
    +import org.apache.spark.sql.sources.BaseRelation
    +
    +/**
    + * Create table 'using carbondata' and insert the query result into it.
    + *
    + * @param table the Catalog Table
    + * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
    + * @param query the query whose result will be insert into the new relation
    + *
    + */
    +
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196655625
 
    --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---
    @@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) {
               do {
                 try {
                   LOG.info(s"starting registry-service on $hostAddress:$port")
    -              val config = RpcEnvConfig(
    +              val config = RpcUtil.getRpcEnvConfig(
    --- End diff --
   
    After analyzing the #2372 these changes are not required,so reverted


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2366: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2366
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5236/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2366: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2366
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6402/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2366: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2366
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5347/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196715002
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---
    @@ -71,15 +72,10 @@
     import org.apache.hadoop.mapreduce.TaskAttemptContext;
     import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     import org.apache.spark.memory.MemoryMode;
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
     import org.apache.spark.sql.catalyst.InternalRow;
     import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
    -import org.apache.spark.sql.execution.vectorized.ColumnVector;
    -import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    -import org.apache.spark.sql.types.CalendarIntervalType;
    -import org.apache.spark.sql.types.Decimal;
    -import org.apache.spark.sql.types.DecimalType;
    -import org.apache.spark.sql.types.StructField;
    -import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.sql.types.*;
    --- End diff --
   
    Please find the task jira id
    https://issues.apache.org/jira/browse/CARBONDATA-2619


---
123456