[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

classic Classic list List threaded Threaded
101 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r195972758
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
                   // named expression list otherwise update the list and add it to set
                   if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
                     namedExpressionList +=
    -                Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
    +                CarbonCompilerUtil.createAliasRef(expressions.head,
    +                  name + "_ sum",
    +                  NamedExpression.newExprId,
                       alias.qualifier,
                       Some(alias.metadata),
    -                  alias.isGenerated)
    +                  Some(alias))
                     validExpressionsMap += AggExpToColumnMappingModel(sumExp)
                   }
                   // check with same expression already count is present then do not add to
                   // named expression list otherwise update the list and add it to set
                   if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
                     namedExpressionList +=
    -                Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
    -                  alias.qualifier,
    -                  Some(alias.metadata),
    -                  alias.isGenerated)
    +                  CarbonCompilerUtil.createAliasRef(expressions.last, name + "_ count",
    --- End diff --
   
    Change CarbonCompilerUtil name as it gives meaning of carbon compiler


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r195975666
 
    --- Diff: pom.xml ---
    @@ -582,6 +582,59 @@
             </plugins>
           </build>
         </profile>
    +    <profile>
    --- End diff --
   
    Check if any method to make profile common for 2.2 and 2.3 and pass only specific details though override or parameters.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r195976603
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -126,9 +126,9 @@ object CarbonReflectionUtils {
       }
     
       def getLogicalRelation(relation: BaseRelation,
    -      expectedOutputAttributes: Seq[Attribute],
    -      catalogTable: Option[CatalogTable],
    -      isStreaming:Boolean): LogicalRelation = {
    +                         expectedOutputAttributes: Seq[Attribute],
    --- End diff --
   
    Wrong indentation use carbon/dev/intellijFormatter


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r195980220
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---
    @@ -71,15 +72,10 @@
     import org.apache.hadoop.mapreduce.TaskAttemptContext;
     import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     import org.apache.spark.memory.MemoryMode;
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
     import org.apache.spark.sql.catalyst.InternalRow;
     import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
    -import org.apache.spark.sql.execution.vectorized.ColumnVector;
    -import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    -import org.apache.spark.sql.types.CalendarIntervalType;
    -import org.apache.spark.sql.types.Decimal;
    -import org.apache.spark.sql.types.DecimalType;
    -import org.apache.spark.sql.types.StructField;
    -import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.sql.types.*;
    --- End diff --
   
    Add another sub-issue and PR for CarbonStreaming spark and hadoop dependency refactoring.
    1) Move CarbonStreamRecordReader.java, Spark2 and CarbonStreamInputFormat.java to Carbon-hadoop and use CarbonStreamRecordReader using reflection.
    2) Take out dependency of Spark on Carbon-streaming.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196307852
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala ---
    @@ -87,7 +87,7 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach {
           sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS  ")
         } catch {
           case e: Exception =>
    -        assert(e.getMessage.contains("no viable alternative at input"))
    +        assert(true)
    --- End diff --
   
    Use `intercept` to intercept and assert the exception message


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196307942
 
    --- Diff: integration/spark-common/pom.xml ---
    @@ -65,6 +65,11 @@
           <artifactId>scalatest_${scala.binary.version}</artifactId>
           <scope>provided</scope>
         </dependency>
    +      <dependency>
    +          <groupId>org.apache.zookeeper</groupId>
    --- End diff --
   
    Why this is introduced?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196309518
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---
    @@ -115,7 +109,7 @@
     
       // vectorized reader
       private StructType outputSchema;
    -  private ColumnarBatch columnarBatch;
    +  private CarbonSparkVectorReader vectorProxy;
    --- End diff --
   
    I will remove this interface, as we are moving CarbonstreamRecordReader to spark2 this interface will not be required


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196309854
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---
    @@ -115,7 +109,7 @@
     
       // vectorized reader
       private StructType outputSchema;
    -  private ColumnarBatch columnarBatch;
    +  private CarbonSparkVectorReader vectorProxy;
    --- End diff --
   
    Mainly because we cannot have any common api's as columnvector and columnarbatch package itself is changed, so we will not be able to extract it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310131
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---
    @@ -418,36 +412,47 @@ private boolean isScanRequired(BlockletHeader header) {
       }
     
       private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
    +    Constructor cons = null;
         // if filter is null and output projection is empty, use the row number of blocklet header
    -    if (skipScanData) {
    -      int rowNums = header.getBlocklet_info().getNum_rows();
    -      columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
    -      columnarBatch.setNumRows(rowNums);
    -      input.skipBlockletData(true);
    -      return rowNums > 0;
    -    }
    -
    -    input.readBlockletData(header);
    -    columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
         int rowNum = 0;
    -    if (null == filter) {
    -      while (input.hasNext()) {
    -        readRowFromStream();
    -        putRowToColumnBatch(rowNum++);
    +    try {
    +      String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy";
    --- End diff --
   
    Since you are using `CarbonVectorProxy`, can you remove the spark dependency in this stream module?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310194
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -127,7 +127,7 @@ class CarbonAppendableStreamSink(
             className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
             jobId = batchId.toString,
             outputPath = fileLogPath,
    -        isAppend = false)
    +        false)
    --- End diff --
   
    no need to modify this


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310344
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -247,6 +252,32 @@ object CarbonReflectionUtils {
         isFormatted
       }
     
    +
    --- End diff --
   
    remove empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310355
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---
    @@ -71,15 +72,10 @@
     import org.apache.hadoop.mapreduce.TaskAttemptContext;
     import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     import org.apache.spark.memory.MemoryMode;
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
     import org.apache.spark.sql.catalyst.InternalRow;
     import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
    -import org.apache.spark.sql.execution.vectorized.ColumnVector;
    -import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    -import org.apache.spark.sql.types.CalendarIntervalType;
    -import org.apache.spark.sql.types.Decimal;
    -import org.apache.spark.sql.types.DecimalType;
    -import org.apache.spark.sql.types.StructField;
    -import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.sql.types.*;
    --- End diff --
   
    Handled


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310531
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---
    @@ -247,6 +252,32 @@ object CarbonReflectionUtils {
         isFormatted
       }
     
    +
    +  def getRowDataSourceScanExecObj(relation: LogicalRelation,
    --- End diff --
   
    please make the indentation like:
    ```
      def getRowDataSourceScanExecObj(
         relation: LogicalRelation,
         output: Seq[Attribute],
         pushedFilters: Seq[Filter]): RowDataSourceScanExec = {
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310806
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---
    @@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
     import org.apache.carbondata.core.features.TableOperation
     import org.apache.carbondata.core.util.CarbonProperties
     
    -/**
    - * Carbon strategies for ddl commands
    - */
    +  /** Carbon strategies for ddl commands
    --- End diff --
   
    move to next line, like
    ```
    /**
     *. Carbon ...
     */
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196310953
 
    --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +public class CarbonVectorProxy implements CarbonSparkVectorReader {
    --- End diff --
   
    Add comment for this class and add annotation @InterfaceAudience.Internal


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196313064
 
    --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +public class CarbonVectorProxy implements CarbonSparkVectorReader {
    +
    +    private ColumnVector columnVector;
    +    private ColumnarBatch columnarBatch;
    +
    +    /**
    +     * Adapter class which handles the columnar vector reading of the carbondata
    +     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    +     * handles the complexity of spark 2.3 version related api changes since
    +     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    +     *
    +     * @param memMode       which represent the type onheap or offheap vector.
    +     * @param rowNum        rows number for vector reading
    +     * @param structFileds, metadata related to current schema of table.
    +     */
    +    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
    +        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
    +    }
    +
    +    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
    +        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
    +    }
    +
    +    /**
    +     * Sets the number of rows in this batch.
    +     */
    +    public void setNumRows(int numRows) {
    +        columnarBatch.setNumRows(numRows);
    +    }
    +
    +    /**
    +     * Returns the number of rows for read, including filtered rows.
    +     */
    +    public int numRows() {
    +        return columnarBatch.capacity();
    +    }
    +
    +    /**
    +     * Called to close all the columns in this batch. It is not valid to access the data after
    +     * calling this. This must be called at the end to clean up memory allocations.
    +     */
    +    public void close() {
    +        columnarBatch.close();
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public InternalRow getRow(int rowId) {
    +        return columnarBatch.getRow(rowId);
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public Object getColumnarBatch() {
    +        return columnarBatch;
    +    }
    +
    +    /**
    +     * Resets this column for writing. The currently stored values are no longer accessible.
    +     */
    +    public void reset() {
    +        columnarBatch.reset();
    +    }
    +
    +
    --- End diff --
   
    remove extra empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196313120
 
    --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +public class CarbonVectorProxy implements CarbonSparkVectorReader {
    +
    +    private ColumnVector columnVector;
    +    private ColumnarBatch columnarBatch;
    +
    +    /**
    +     * Adapter class which handles the columnar vector reading of the carbondata
    +     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    +     * handles the complexity of spark 2.3 version related api changes since
    +     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    +     *
    +     * @param memMode       which represent the type onheap or offheap vector.
    +     * @param rowNum        rows number for vector reading
    +     * @param structFileds, metadata related to current schema of table.
    +     */
    +    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
    +        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
    +    }
    +
    +    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
    +        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
    +    }
    +
    +    /**
    +     * Sets the number of rows in this batch.
    +     */
    +    public void setNumRows(int numRows) {
    +        columnarBatch.setNumRows(numRows);
    +    }
    +
    +    /**
    +     * Returns the number of rows for read, including filtered rows.
    +     */
    +    public int numRows() {
    +        return columnarBatch.capacity();
    +    }
    +
    +    /**
    +     * Called to close all the columns in this batch. It is not valid to access the data after
    +     * calling this. This must be called at the end to clean up memory allocations.
    +     */
    +    public void close() {
    +        columnarBatch.close();
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public InternalRow getRow(int rowId) {
    +        return columnarBatch.getRow(rowId);
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public Object getColumnarBatch() {
    +        return columnarBatch;
    +    }
    +
    +    /**
    +     * Resets this column for writing. The currently stored values are no longer accessible.
    +     */
    +    public void reset() {
    +        columnarBatch.reset();
    +    }
    +
    +
    +    public void putRowToColumnBatch(int rowId, Object value, int offset) {
    +        this.columnVector = columnarBatch.column(offset);
    +        org.apache.spark.sql.types.DataType t = columnVector.dataType();
    +        if (null == value) {
    +            columnVector.putNull(rowId);
    +        } else {
    +            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
    +                columnVector.putBoolean(rowId, (boolean) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
    +                columnVector.putByte(rowId, (byte) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
    +                columnVector.putShort(rowId, (short) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
    +                columnVector.putInt(rowId, (int) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
    +                columnVector.putLong(rowId, (long) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
    +                columnVector.putFloat(rowId, (float) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
    +                columnVector.putDouble(rowId, (double) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
    +                UTF8String v = (UTF8String) value;
    +                columnVector.putByteArray(rowId, v.getBytes());
    +            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
    +                DecimalType dt = (DecimalType) t;
    +                Decimal d = Decimal.fromDecimal(value);
    +                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
    +                    columnVector.putInt(rowId, (int) d.toUnscaledLong());
    +                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
    +                    columnVector.putLong(rowId, d.toUnscaledLong());
    +                } else {
    +                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
    +                    byte[] bytes = integer.toByteArray();
    +                    columnVector.putByteArray(rowId, bytes, 0, bytes.length);
    +                }
    +            } else if (t instanceof CalendarIntervalType) {
    +                CalendarInterval c = (CalendarInterval) value;
    +                columnVector.getChildColumn(0).putInt(rowId, c.months);
    +                columnVector.getChildColumn(1).putLong(rowId, c.microseconds);
    +            } else if (t instanceof org.apache.spark.sql.types.DateType) {
    +                columnVector.putInt(rowId, (int) value);
    +            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
    +                columnVector.putLong(rowId, (long) value);
    +            }
    +        }
    +
    --- End diff --
   
    remove empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196313170
 
    --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +public class CarbonVectorProxy implements CarbonSparkVectorReader {
    +
    +    private ColumnVector columnVector;
    +    private ColumnarBatch columnarBatch;
    +
    +    /**
    +     * Adapter class which handles the columnar vector reading of the carbondata
    +     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    +     * handles the complexity of spark 2.3 version related api changes since
    +     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    +     *
    +     * @param memMode       which represent the type onheap or offheap vector.
    +     * @param rowNum        rows number for vector reading
    +     * @param structFileds, metadata related to current schema of table.
    +     */
    +    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
    +        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
    +    }
    +
    +    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
    +        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
    +    }
    +
    +    /**
    +     * Sets the number of rows in this batch.
    +     */
    +    public void setNumRows(int numRows) {
    +        columnarBatch.setNumRows(numRows);
    +    }
    +
    +    /**
    +     * Returns the number of rows for read, including filtered rows.
    +     */
    +    public int numRows() {
    +        return columnarBatch.capacity();
    +    }
    +
    +    /**
    +     * Called to close all the columns in this batch. It is not valid to access the data after
    +     * calling this. This must be called at the end to clean up memory allocations.
    +     */
    +    public void close() {
    +        columnarBatch.close();
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public InternalRow getRow(int rowId) {
    +        return columnarBatch.getRow(rowId);
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public Object getColumnarBatch() {
    +        return columnarBatch;
    +    }
    +
    +    /**
    +     * Resets this column for writing. The currently stored values are no longer accessible.
    +     */
    +    public void reset() {
    +        columnarBatch.reset();
    +    }
    +
    +
    +    public void putRowToColumnBatch(int rowId, Object value, int offset) {
    +        this.columnVector = columnarBatch.column(offset);
    +        org.apache.spark.sql.types.DataType t = columnVector.dataType();
    +        if (null == value) {
    +            columnVector.putNull(rowId);
    +        } else {
    +            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
    +                columnVector.putBoolean(rowId, (boolean) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
    +                columnVector.putByte(rowId, (byte) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
    +                columnVector.putShort(rowId, (short) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
    +                columnVector.putInt(rowId, (int) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
    +                columnVector.putLong(rowId, (long) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
    +                columnVector.putFloat(rowId, (float) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
    +                columnVector.putDouble(rowId, (double) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
    +                UTF8String v = (UTF8String) value;
    +                columnVector.putByteArray(rowId, v.getBytes());
    +            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
    +                DecimalType dt = (DecimalType) t;
    +                Decimal d = Decimal.fromDecimal(value);
    +                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
    +                    columnVector.putInt(rowId, (int) d.toUnscaledLong());
    +                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
    +                    columnVector.putLong(rowId, d.toUnscaledLong());
    +                } else {
    +                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
    +                    byte[] bytes = integer.toByteArray();
    +                    columnVector.putByteArray(rowId, bytes, 0, bytes.length);
    +                }
    +            } else if (t instanceof CalendarIntervalType) {
    +                CalendarInterval c = (CalendarInterval) value;
    +                columnVector.getChildColumn(0).putInt(rowId, c.months);
    +                columnVector.getChildColumn(1).putLong(rowId, c.microseconds);
    +            } else if (t instanceof org.apache.spark.sql.types.DateType) {
    +                columnVector.putInt(rowId, (int) value);
    +            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
    +                columnVector.putLong(rowId, (long) value);
    +            }
    +        }
    +
    +    }
    +
    +    public void putBoolean(int rowId, boolean value, int ordinal) {
    +
    --- End diff --
   
    remove all these empty lines in this class


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196313373
 
    --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +public class CarbonVectorProxy implements CarbonSparkVectorReader {
    +
    +    private ColumnVector columnVector;
    +    private ColumnarBatch columnarBatch;
    +
    +    /**
    +     * Adapter class which handles the columnar vector reading of the carbondata
    +     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    +     * handles the complexity of spark 2.3 version related api changes since
    +     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    +     *
    +     * @param memMode       which represent the type onheap or offheap vector.
    +     * @param rowNum        rows number for vector reading
    +     * @param structFileds, metadata related to current schema of table.
    +     */
    +    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
    +        columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
    +    }
    +
    +    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
    +        columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
    +    }
    +
    +    /**
    +     * Sets the number of rows in this batch.
    +     */
    +    public void setNumRows(int numRows) {
    +        columnarBatch.setNumRows(numRows);
    +    }
    +
    +    /**
    +     * Returns the number of rows for read, including filtered rows.
    +     */
    +    public int numRows() {
    +        return columnarBatch.capacity();
    +    }
    +
    +    /**
    +     * Called to close all the columns in this batch. It is not valid to access the data after
    +     * calling this. This must be called at the end to clean up memory allocations.
    +     */
    +    public void close() {
    +        columnarBatch.close();
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public InternalRow getRow(int rowId) {
    +        return columnarBatch.getRow(rowId);
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public Object getColumnarBatch() {
    +        return columnarBatch;
    +    }
    +
    +    /**
    +     * Resets this column for writing. The currently stored values are no longer accessible.
    +     */
    +    public void reset() {
    +        columnarBatch.reset();
    +    }
    +
    +
    +    public void putRowToColumnBatch(int rowId, Object value, int offset) {
    +        this.columnVector = columnarBatch.column(offset);
    +        org.apache.spark.sql.types.DataType t = columnVector.dataType();
    +        if (null == value) {
    +            columnVector.putNull(rowId);
    +        } else {
    +            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
    +                columnVector.putBoolean(rowId, (boolean) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
    +                columnVector.putByte(rowId, (byte) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
    +                columnVector.putShort(rowId, (short) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
    +                columnVector.putInt(rowId, (int) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
    +                columnVector.putLong(rowId, (long) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
    +                columnVector.putFloat(rowId, (float) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
    +                columnVector.putDouble(rowId, (double) value);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
    +                UTF8String v = (UTF8String) value;
    +                columnVector.putByteArray(rowId, v.getBytes());
    +            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
    +                DecimalType dt = (DecimalType) t;
    +                Decimal d = Decimal.fromDecimal(value);
    +                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
    +                    columnVector.putInt(rowId, (int) d.toUnscaledLong());
    +                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
    +                    columnVector.putLong(rowId, d.toUnscaledLong());
    +                } else {
    +                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
    +                    byte[] bytes = integer.toByteArray();
    +                    columnVector.putByteArray(rowId, bytes, 0, bytes.length);
    +                }
    +            } else if (t instanceof CalendarIntervalType) {
    +                CalendarInterval c = (CalendarInterval) value;
    +                columnVector.getChildColumn(0).putInt(rowId, c.months);
    +                columnVector.getChildColumn(1).putLong(rowId, c.microseconds);
    +            } else if (t instanceof org.apache.spark.sql.types.DateType) {
    +                columnVector.putInt(rowId, (int) value);
    +            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
    +                columnVector.putLong(rowId, (long) value);
    +            }
    +        }
    +
    +    }
    +
    +    public void putBoolean(int rowId, boolean value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
    +    }
    +
    +    public void putByte(int rowId, byte value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
    +    }
    +
    +    public void putShort(int rowId, short value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putShort(rowId, (short) value);
    +    }
    +
    +    public void putInt(int rowId, int value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putInt(rowId, (int) value);
    +    }
    +
    +    public void putFloat(int rowId, float value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
    +    }
    +
    +    public void putLong(int rowId, long value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putLong(rowId, (long) value);
    +    }
    +
    +    public void putDouble(int rowId, double value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
    +    }
    +
    +    public void putByteArray(int rowId, byte[] value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
    +    }
    +
    +    public void putInts(int rowId, int count, int value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putInts(rowId, count, value);
    +    }
    +
    +    public void putShorts(int rowId, int count, short value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putShorts(rowId, count, value);
    +    }
    +
    +    public void putLongs(int rowId, int count, long value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putLongs(rowId, count, value);
    +    }
    +
    +    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
    +        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
    +
    +    }
    +
    +    public void putDoubles(int rowId, int count, double value, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
    +    }
    +
    +    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
    +    }
    +
    +    public void putNull(int rowId, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putNull(rowId);
    +    }
    +
    +    public void putNulls(int rowId, int count, int ordinal) {
    +
    +        columnarBatch.column(ordinal).putNulls(rowId, count);
    +    }
    +
    +    public boolean isNullAt(int rowId, int ordinal) {
    +
    +        return columnarBatch.column(ordinal).isNullAt(rowId);
    +    }
    +
    +    public DataType dataType(int ordinal) {
    +
    --- End diff --
   
    remove all these empty lines in this class


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2366#discussion_r196314106
 
    --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionState.scala ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog._
    +import org.apache.spark.sql.catalyst.expressions.Expression
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
    +import org.apache.spark.sql.hive.client.HiveClient
    +import org.apache.spark.sql.internal.{SQLConf, SessionState}
    +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
    +import org.apache.spark.sql.parser.CarbonSparkSqlParser
    +
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache if the carbontable in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonHiveSessionCatalog(
    +    externalCatalog: HiveExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends HiveSessionCatalog (
    +    externalCatalog,
    +    globalTempViewManager,
    +    new HiveMetastoreCatalog(sparkSession),
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) with CarbonSessionCatalog {
    +
    +  private lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +  /**
    +   * return's the carbonEnv instance
    +   * @return
    +   */
    +  override def getCarbonEnv() : CarbonEnv = {
    +    carbonEnv
    +  }
    +
    +  // Initialize all listeners to the Operation bus.
    +  CarbonEnv.initListeners()
    +
    +  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
    +    val rtnRelation = super.lookupRelation(name)
    +    val isRelationRefreshed =
    +      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
    +    if (isRelationRefreshed) {
    +      super.lookupRelation(name)
    +    } else {
    +      rtnRelation
    +    }
    +  }
    +
    +  /**
    +   * returns hive client from HiveExternalCatalog
    +   *
    +   * @return
    +   */
    +  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
    +    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
    +      .asInstanceOf[HiveExternalCatalog].client
    +  }
    +
    +  def alterTableRename(oldTableIdentifier: TableIdentifier,
    +      newTableIdentifier: TableIdentifier,
    +      newTablePath: String): Unit = {
    +    getClient().runSqlHive(
    +      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
    +      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
    +    getClient().runSqlHive(
    +      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
    +      s"SET SERDEPROPERTIES" +
    +      s"('tableName'='${ newTableIdentifier.table }', " +
    +      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
    +  }
    +
    +  override def alterTable(tableIdentifier: TableIdentifier,
    --- End diff --
   
    move parameter to next line, follow this in all method


---
123456