[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

classic Classic list List threaded Threaded
85 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6457/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8143/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/72/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6461/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6461/
    Already failing in other builds also, seems to build environment issue


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    @jackylk Now spark 2.3.2 is about to release, can this PR works with all spark 2.3 branch including 2.3.2?
    As i told before there should not be much problem while rebasing with spark 2.3.2 version also,  since its a minor versions the interfaces are intact till now. Rebasing effort with spark 2.3.2 should be very minimal



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213681240
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -122,7 +122,7 @@ class CarbonAppendableStreamSink(
             className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
             jobId = batchId.toString,
             outputPath = fileLogPath,
    -        isAppend = false)
    +        false)
    --- End diff --
   
    Keep back the old code


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213681553
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -421,13 +421,13 @@ class CarbonScanRDD[T: ClassTag](
               // create record reader for row format
               DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
               val inputFormat = new CarbonStreamInputFormat
    -          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
    -            .asInstanceOf[CarbonStreamRecordReader]
    -          streamReader.setVectorReader(vectorReader)
    -          streamReader.setInputMetricsStats(inputMetricsStats)
    +          inputFormat.setVectorReader(vectorReader)
    --- End diff --
   
    Create method with name `setIsVectorReader`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213686265
 
    --- Diff: integration/spark-datasource/pom.xml ---
    @@ -192,5 +192,132 @@
             <maven.test.skip>true</maven.test.skip>
           </properties>
         </profile>
    +    <profile>
    +      <id>spark-2.1</id>
    +      <properties>
    +        <spark.version>2.1.0</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +      <build>
    +        <plugins>
    +          <plugin>
    +            <groupId>org.apache.maven.plugins</groupId>
    +            <artifactId>maven-compiler-plugin</artifactId>
    +            <configuration>
    +              <excludes>
    +                <exclude>src/main/spark2.2</exclude>
    +                <exclude>src/main/spark2.3</exclude>
    +                <exclude>src/main/commonTo2.2And2.3</exclude>
    --- End diff --
   
    I think here we only require 2 packages
    1. spark2.1andspark2.2
    2. spark 2.3plus


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213688091
 
    --- Diff: integration/spark-datasource/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.parquet.column.Dictionary;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * Adapter class which handles the columnar vector reading of the carbondata
    + * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    + * handles the complexity of spark 2.1 version related api changes since
    + * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    + */
    +public class CarbonVectorProxy {
    --- End diff --
   
    2.1 and 2.2 should have same


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213689426
 
    --- Diff: integration/spark-datasource/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.Dictionary;
    +import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
    +import org.apache.spark.sql.types.*;
    +import org.apache.spark.sql.vectorized.ColumnarBatch;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * Adapter class which handles the columnar vector reading of the carbondata
    + * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    + * handles the complexity of spark 2.3 version related api changes since
    + * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    + */
    +public class CarbonVectorProxy {
    +
    +    private ColumnarBatch columnarBatch;
    +    private WritableColumnVector[] columnVectors;
    +
    +    /**
    +     * Adapter class which handles the columnar vector reading of the carbondata
    +     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    +     * handles the complexity of spark 2.3 version related api changes since
    +     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    +     *
    +     * @param memMode       which represent the type onheap or offheap vector.
    +     * @param rowNum        rows number for vector reading
    +     * @param structFileds, metadata related to current schema of table.
    +     */
    +    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
    +        columnVectors = ColumnVectorFactory
    +                .getColumnVector(memMode, new StructType(structFileds), rowNum);
    +        columnarBatch = new ColumnarBatch(columnVectors);
    +        columnarBatch.setNumRows(rowNum);
    +    }
    +
    +    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
    +        columnVectors = ColumnVectorFactory
    +                .getColumnVector(memMode, outputSchema, rowNum);
    +        columnarBatch = new ColumnarBatch(columnVectors);
    +        columnarBatch.setNumRows(rowNum);
    +    }
    +
    +    /**
    +     * Returns the number of rows for read, including filtered rows.
    +     */
    +    public int numRows() {
    +        return columnarBatch.numRows();
    +    }
    +
    +    public Object reserveDictionaryIds(int capacity, int ordinal) {
    +        return columnVectors[ordinal].reserveDictionaryIds(capacity);
    +    }
    +
    +    /**
    +     * This API will return a columnvector from a batch of column vector rows
    +     * based on the ordinal
    +     *
    +     * @param ordinal
    +     * @return
    +     */
    +    public WritableColumnVector column(int ordinal) {
    +        return (WritableColumnVector) columnarBatch.column(ordinal);
    +    }
    +
    +    public WritableColumnVector getColumnVector(int ordinal) {
    +        return columnVectors[ordinal];
    +    }
    +
    +    /**
    +     * Resets this column for writing. The currently stored values are no longer accessible.
    +     */
    +    public void reset() {
    +        for (WritableColumnVector col : columnVectors) {
    +            col.reset();
    +        }
    +    }
    +
    +    public void resetDictionaryIds(int ordinal) {
    +        columnVectors[ordinal].getDictionaryIds().reset();
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public InternalRow getRow(int rowId) {
    +        return columnarBatch.getRow(rowId);
    +    }
    +
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public Object getColumnarBatch() {
    +        return columnarBatch;
    +    }
    +
    +    /**
    +     * Called to close all the columns in this batch. It is not valid to access the data after
    +     * calling this. This must be called at the end to clean up memory allocations.
    +     */
    +    public void close() {
    +        columnarBatch.close();
    +    }
    +
    +    /**
    +     * Sets the number of rows in this batch.
    +     */
    +    public void setNumRows(int numRows) {
    +        columnarBatch.setNumRows(numRows);
    +    }
    +
    +    public void putRowToColumnBatch(int rowId, Object value, int offset) {
    +        org.apache.spark.sql.types.DataType t = dataType(offset);
    +        if (null == value) {
    +            putNull(rowId, offset);
    +        } else {
    +            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
    +                putBoolean(rowId, (boolean) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
    +                putByte(rowId, (byte) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
    +                putShort(rowId, (short) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
    +                putInt(rowId, (int) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
    +                putLong(rowId, (long) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
    +                putFloat(rowId, (float) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
    +                putDouble(rowId, (double) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
    +                UTF8String v = (UTF8String) value;
    +                putByteArray(rowId, v.getBytes(), offset);
    +            } else if (t instanceof DecimalType) {
    +                DecimalType dt = (DecimalType) t;
    +                Decimal d = Decimal.fromDecimal(value);
    +                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
    +                    putInt(rowId, (int) d.toUnscaledLong(), offset);
    +                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
    +                    putLong(rowId, d.toUnscaledLong(), offset);
    +                } else {
    +                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
    +                    byte[] bytes = integer.toByteArray();
    +                    putByteArray(rowId, bytes, 0, bytes.length, offset);
    +                }
    +            } else if (t instanceof CalendarIntervalType) {
    +                CalendarInterval c = (CalendarInterval) value;
    +                columnVectors[offset].getChild(0).putInt(rowId, c.months);
    +                columnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
    +            } else if (t instanceof org.apache.spark.sql.types.DateType) {
    +                putInt(rowId, (int) value, offset);
    +            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
    +                putLong(rowId, (long) value, offset);
    +            }
    +        }
    +    }
    +
    +    public void putBoolean(int rowId, boolean value, int ordinal) {
    +        columnVectors[ordinal].putBoolean(rowId, (boolean) value);
    +    }
    +
    +    public void putByte(int rowId, byte value, int ordinal) {
    +        columnVectors[ordinal].putByte(rowId, (byte) value);
    +    }
    +
    +    public void putShort(int rowId, short value, int ordinal) {
    +        columnVectors[ordinal].putShort(rowId, (short) value);
    +    }
    +
    +    public void putInt(int rowId, int value, int ordinal) {
    +        columnVectors[ordinal].putInt(rowId, (int) value);
    +    }
    +
    +    public void putDictionaryInt(int rowId, int value, int ordinal) {
    +        columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value);
    +    }
    +
    +    public void putFloat(int rowId, float value, int ordinal) {
    +        columnVectors[ordinal].putFloat(rowId, (float) value);
    +    }
    +
    +    public void putLong(int rowId, long value, int ordinal) {
    +        columnVectors[ordinal].putLong(rowId, (long) value);
    +    }
    +
    +    public void putDouble(int rowId, double value, int ordinal) {
    +        columnVectors[ordinal].putDouble(rowId, (double) value);
    +    }
    +
    +    public void putByteArray(int rowId, byte[] value, int ordinal) {
    +        columnVectors[ordinal].putByteArray(rowId, (byte[]) value);
    +    }
    +
    +    public void putInts(int rowId, int count, int value, int ordinal) {
    +        columnVectors[ordinal].putInts(rowId, count, value);
    +    }
    +
    +    public void putShorts(int rowId, int count, short value, int ordinal) {
    +        columnVectors[ordinal].putShorts(rowId, count, value);
    +    }
    +
    +    public void putLongs(int rowId, int count, long value, int ordinal) {
    +        columnVectors[ordinal].putLongs(rowId, count, value);
    +    }
    +
    +    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
    +        columnVectors[ordinal].putDecimal(rowId, value, precision);
    +
    +    }
    +
    +    public void putDoubles(int rowId, int count, double value, int ordinal) {
    +        columnVectors[ordinal].putDoubles(rowId, count, value);
    +    }
    +
    +    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
    +        columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
    +    }
    +
    +    public void putNull(int rowId, int ordinal) {
    +        columnVectors[ordinal].putNull(rowId);
    +    }
    +
    +    public void putNulls(int rowId, int count, int ordinal) {
    +        columnVectors[ordinal].putNulls(rowId, count);
    +    }
    +
    +    public void putNotNull(int rowId, int ordinal) {
    +        columnVectors[ordinal].putNotNull(rowId);
    +    }
    +
    +    public void putNotNulls(int rowId, int count, int ordinal) {
    +        columnVectors[ordinal].putNotNulls(rowId, count);
    +    }
    +
    +    public boolean isNullAt(int rowId, int ordinal) {
    +        return columnVectors[ordinal].isNullAt(rowId);
    +    }
    +
    +    public boolean hasDictionary(int ordinal) {
    +        return columnVectors[ordinal].hasDictionary();
    +    }
    +
    +    public void setDictionary(Object dictionary, int ordinal) {
    --- End diff --
   
    Please make sure we set correct dictionary. 2.3 does not  expect org.apache.parquet.column.Dictionary


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213689912
 
    --- Diff: integration/spark2/pom.xml ---
    @@ -276,6 +312,8 @@
                 <configuration>
                   <excludes>
                     <exclude>src/main/spark2.1</exclude>
    +                <exclude>src/main/spark2.3</exclude>
    +                <exclude>src/main/commonTo2.2And2.3</exclude>
    --- End diff --
   
    I think here no need to include


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213695426
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---
    @@ -700,3 +724,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         }
       }
     }
    +
    +class CarbonPhysicalPlanException extends Exception {
    +
    +}
    --- End diff --
   
    remove the empty braces


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8148/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213700919
 
    --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Expression
    +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
    +import org.apache.spark.sql.types.{DataType, StringType}
    +
    +/**
    + * Custom expression to override the deterministic property .
    + */
    +case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
    --- End diff --
   
    I don't see any differences , why it is copied 3 times?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/77/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2642: [CARBONDATA-2532][Integration] Carbon to support spa...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2642
 
    Please check the build failures of 2.3 CI http://136.243.101.176:8080/job/ManualApacheCarbonPRBuilder2.1/172/


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213902360
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -122,7 +122,7 @@ class CarbonAppendableStreamSink(
             className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
             jobId = batchId.toString,
             outputPath = fileLogPath,
    -        isAppend = false)
    +        false)
    --- End diff --
   
    in 2.3 isAppend name is changed to dynamicPartitionOverwrite


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213903286
 
    --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Expression
    +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
    +import org.apache.spark.sql.types.{DataType, StringType}
    +
    +/**
    + * Custom expression to override the deterministic property .
    + */
    +case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
    --- End diff --
   
    in 2.1 and 2.2
     override def deterministic: Boolean = true
    in 2.3
    override lazy val deterministic: Boolean = true


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2642: [CARBONDATA-2532][Integration] Carbon to supp...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2642#discussion_r213905161
 
    --- Diff: integration/spark-datasource/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql;
    +
    +import java.math.BigInteger;
    +
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.Dictionary;
    +import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
    +import org.apache.spark.sql.types.*;
    +import org.apache.spark.sql.vectorized.ColumnarBatch;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * Adapter class which handles the columnar vector reading of the carbondata
    + * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    + * handles the complexity of spark 2.3 version related api changes since
    + * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    + */
    +public class CarbonVectorProxy {
    +
    +    private ColumnarBatch columnarBatch;
    +    private WritableColumnVector[] columnVectors;
    +
    +    /**
    +     * Adapter class which handles the columnar vector reading of the carbondata
    +     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
    +     * handles the complexity of spark 2.3 version related api changes since
    +     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
    +     *
    +     * @param memMode       which represent the type onheap or offheap vector.
    +     * @param rowNum        rows number for vector reading
    +     * @param structFileds, metadata related to current schema of table.
    +     */
    +    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
    +        columnVectors = ColumnVectorFactory
    +                .getColumnVector(memMode, new StructType(structFileds), rowNum);
    +        columnarBatch = new ColumnarBatch(columnVectors);
    +        columnarBatch.setNumRows(rowNum);
    +    }
    +
    +    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
    +        columnVectors = ColumnVectorFactory
    +                .getColumnVector(memMode, outputSchema, rowNum);
    +        columnarBatch = new ColumnarBatch(columnVectors);
    +        columnarBatch.setNumRows(rowNum);
    +    }
    +
    +    /**
    +     * Returns the number of rows for read, including filtered rows.
    +     */
    +    public int numRows() {
    +        return columnarBatch.numRows();
    +    }
    +
    +    public Object reserveDictionaryIds(int capacity, int ordinal) {
    +        return columnVectors[ordinal].reserveDictionaryIds(capacity);
    +    }
    +
    +    /**
    +     * This API will return a columnvector from a batch of column vector rows
    +     * based on the ordinal
    +     *
    +     * @param ordinal
    +     * @return
    +     */
    +    public WritableColumnVector column(int ordinal) {
    +        return (WritableColumnVector) columnarBatch.column(ordinal);
    +    }
    +
    +    public WritableColumnVector getColumnVector(int ordinal) {
    +        return columnVectors[ordinal];
    +    }
    +
    +    /**
    +     * Resets this column for writing. The currently stored values are no longer accessible.
    +     */
    +    public void reset() {
    +        for (WritableColumnVector col : columnVectors) {
    +            col.reset();
    +        }
    +    }
    +
    +    public void resetDictionaryIds(int ordinal) {
    +        columnVectors[ordinal].getDictionaryIds().reset();
    +    }
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public InternalRow getRow(int rowId) {
    +        return columnarBatch.getRow(rowId);
    +    }
    +
    +
    +    /**
    +     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    +     */
    +    public Object getColumnarBatch() {
    +        return columnarBatch;
    +    }
    +
    +    /**
    +     * Called to close all the columns in this batch. It is not valid to access the data after
    +     * calling this. This must be called at the end to clean up memory allocations.
    +     */
    +    public void close() {
    +        columnarBatch.close();
    +    }
    +
    +    /**
    +     * Sets the number of rows in this batch.
    +     */
    +    public void setNumRows(int numRows) {
    +        columnarBatch.setNumRows(numRows);
    +    }
    +
    +    public void putRowToColumnBatch(int rowId, Object value, int offset) {
    +        org.apache.spark.sql.types.DataType t = dataType(offset);
    +        if (null == value) {
    +            putNull(rowId, offset);
    +        } else {
    +            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
    +                putBoolean(rowId, (boolean) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
    +                putByte(rowId, (byte) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
    +                putShort(rowId, (short) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
    +                putInt(rowId, (int) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
    +                putLong(rowId, (long) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
    +                putFloat(rowId, (float) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
    +                putDouble(rowId, (double) value, offset);
    +            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
    +                UTF8String v = (UTF8String) value;
    +                putByteArray(rowId, v.getBytes(), offset);
    +            } else if (t instanceof DecimalType) {
    +                DecimalType dt = (DecimalType) t;
    +                Decimal d = Decimal.fromDecimal(value);
    +                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
    +                    putInt(rowId, (int) d.toUnscaledLong(), offset);
    +                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
    +                    putLong(rowId, d.toUnscaledLong(), offset);
    +                } else {
    +                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
    +                    byte[] bytes = integer.toByteArray();
    +                    putByteArray(rowId, bytes, 0, bytes.length, offset);
    +                }
    +            } else if (t instanceof CalendarIntervalType) {
    +                CalendarInterval c = (CalendarInterval) value;
    +                columnVectors[offset].getChild(0).putInt(rowId, c.months);
    +                columnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
    +            } else if (t instanceof org.apache.spark.sql.types.DateType) {
    +                putInt(rowId, (int) value, offset);
    +            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
    +                putLong(rowId, (long) value, offset);
    +            }
    +        }
    +    }
    +
    +    public void putBoolean(int rowId, boolean value, int ordinal) {
    +        columnVectors[ordinal].putBoolean(rowId, (boolean) value);
    +    }
    +
    +    public void putByte(int rowId, byte value, int ordinal) {
    +        columnVectors[ordinal].putByte(rowId, (byte) value);
    +    }
    +
    +    public void putShort(int rowId, short value, int ordinal) {
    +        columnVectors[ordinal].putShort(rowId, (short) value);
    +    }
    +
    +    public void putInt(int rowId, int value, int ordinal) {
    +        columnVectors[ordinal].putInt(rowId, (int) value);
    +    }
    +
    +    public void putDictionaryInt(int rowId, int value, int ordinal) {
    +        columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value);
    +    }
    +
    +    public void putFloat(int rowId, float value, int ordinal) {
    +        columnVectors[ordinal].putFloat(rowId, (float) value);
    +    }
    +
    +    public void putLong(int rowId, long value, int ordinal) {
    +        columnVectors[ordinal].putLong(rowId, (long) value);
    +    }
    +
    +    public void putDouble(int rowId, double value, int ordinal) {
    +        columnVectors[ordinal].putDouble(rowId, (double) value);
    +    }
    +
    +    public void putByteArray(int rowId, byte[] value, int ordinal) {
    +        columnVectors[ordinal].putByteArray(rowId, (byte[]) value);
    +    }
    +
    +    public void putInts(int rowId, int count, int value, int ordinal) {
    +        columnVectors[ordinal].putInts(rowId, count, value);
    +    }
    +
    +    public void putShorts(int rowId, int count, short value, int ordinal) {
    +        columnVectors[ordinal].putShorts(rowId, count, value);
    +    }
    +
    +    public void putLongs(int rowId, int count, long value, int ordinal) {
    +        columnVectors[ordinal].putLongs(rowId, count, value);
    +    }
    +
    +    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
    +        columnVectors[ordinal].putDecimal(rowId, value, precision);
    +
    +    }
    +
    +    public void putDoubles(int rowId, int count, double value, int ordinal) {
    +        columnVectors[ordinal].putDoubles(rowId, count, value);
    +    }
    +
    +    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
    +        columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
    +    }
    +
    +    public void putNull(int rowId, int ordinal) {
    +        columnVectors[ordinal].putNull(rowId);
    +    }
    +
    +    public void putNulls(int rowId, int count, int ordinal) {
    +        columnVectors[ordinal].putNulls(rowId, count);
    +    }
    +
    +    public void putNotNull(int rowId, int ordinal) {
    +        columnVectors[ordinal].putNotNull(rowId);
    +    }
    +
    +    public void putNotNulls(int rowId, int count, int ordinal) {
    +        columnVectors[ordinal].putNotNulls(rowId, count);
    +    }
    +
    +    public boolean isNullAt(int rowId, int ordinal) {
    +        return columnVectors[ordinal].isNullAt(rowId);
    +    }
    +
    +    public boolean hasDictionary(int ordinal) {
    +        return columnVectors[ordinal].hasDictionary();
    +    }
    +
    +    public void setDictionary(Object dictionary, int ordinal) {
    --- End diff --
   
    Yes, proxy layer corresponding to the spark 2.3 and spark 2.1&2.2 is setting the dictionary instance


---
12345