Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2642 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6457/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2642 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8143/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2642 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/72/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2642 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6461/ --- |
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on the issue:
https://github.com/apache/carbondata/pull/2642 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6461/ Already failing in other builds also, seems to build environment issue --- |
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on the issue:
https://github.com/apache/carbondata/pull/2642 @jackylk Now spark 2.3.2 is about to release, can this PR works with all spark 2.3 branch including 2.3.2? As i told before there should not be much problem while rebasing with spark 2.3.2 version also, since its a minor versions the interfaces are intact till now. Rebasing effort with spark 2.3.2 should be very minimal --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213681240 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -122,7 +122,7 @@ class CarbonAppendableStreamSink( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = fileLogPath, - isAppend = false) + false) --- End diff -- Keep back the old code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213681553 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -421,13 +421,13 @@ class CarbonScanRDD[T: ClassTag]( // create record reader for row format DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance()) val inputFormat = new CarbonStreamInputFormat - val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) - .asInstanceOf[CarbonStreamRecordReader] - streamReader.setVectorReader(vectorReader) - streamReader.setInputMetricsStats(inputMetricsStats) + inputFormat.setVectorReader(vectorReader) --- End diff -- Create method with name `setIsVectorReader` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213686265 --- Diff: integration/spark-datasource/pom.xml --- @@ -192,5 +192,132 @@ <maven.test.skip>true</maven.test.skip> </properties> </profile> + <profile> + <id>spark-2.1</id> + <properties> + <spark.version>2.1.0</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/spark2.2</exclude> + <exclude>src/main/spark2.3</exclude> + <exclude>src/main/commonTo2.2And2.3</exclude> --- End diff -- I think here we only require 2 packages 1. spark2.1andspark2.2 2. spark 2.3plus --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213688091 --- Diff: integration/spark-datasource/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.parquet.column.Dictionary; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.1 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + */ +public class CarbonVectorProxy { --- End diff -- 2.1 and 2.2 should have same --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213689426 --- Diff: integration/spark-datasource/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.Dictionary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + */ +public class CarbonVectorProxy { + + private ColumnarBatch columnarBatch; + private WritableColumnVector[] columnVectors; + + /** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + * + * @param memMode which represent the type onheap or offheap vector. + * @param rowNum rows number for vector reading + * @param structFileds, metadata related to current schema of table. + */ + public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { + columnVectors = ColumnVectorFactory + .getColumnVector(memMode, new StructType(structFileds), rowNum); + columnarBatch = new ColumnarBatch(columnVectors); + columnarBatch.setNumRows(rowNum); + } + + public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { + columnVectors = ColumnVectorFactory + .getColumnVector(memMode, outputSchema, rowNum); + columnarBatch = new ColumnarBatch(columnVectors); + columnarBatch.setNumRows(rowNum); + } + + /** + * Returns the number of rows for read, including filtered rows. + */ + public int numRows() { + return columnarBatch.numRows(); + } + + public Object reserveDictionaryIds(int capacity, int ordinal) { + return columnVectors[ordinal].reserveDictionaryIds(capacity); + } + + /** + * This API will return a columnvector from a batch of column vector rows + * based on the ordinal + * + * @param ordinal + * @return + */ + public WritableColumnVector column(int ordinal) { + return (WritableColumnVector) columnarBatch.column(ordinal); + } + + public WritableColumnVector getColumnVector(int ordinal) { + return columnVectors[ordinal]; + } + + /** + * Resets this column for writing. The currently stored values are no longer accessible. + */ + public void reset() { + for (WritableColumnVector col : columnVectors) { + col.reset(); + } + } + + public void resetDictionaryIds(int ordinal) { + columnVectors[ordinal].getDictionaryIds().reset(); + } + + /** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ + public InternalRow getRow(int rowId) { + return columnarBatch.getRow(rowId); + } + + + /** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ + public Object getColumnarBatch() { + return columnarBatch; + } + + /** + * Called to close all the columns in this batch. It is not valid to access the data after + * calling this. This must be called at the end to clean up memory allocations. + */ + public void close() { + columnarBatch.close(); + } + + /** + * Sets the number of rows in this batch. + */ + public void setNumRows(int numRows) { + columnarBatch.setNumRows(numRows); + } + + public void putRowToColumnBatch(int rowId, Object value, int offset) { + org.apache.spark.sql.types.DataType t = dataType(offset); + if (null == value) { + putNull(rowId, offset); + } else { + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + putBoolean(rowId, (boolean) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + putByte(rowId, (byte) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + putShort(rowId, (short) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + putInt(rowId, (int) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + putLong(rowId, (long) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + putFloat(rowId, (float) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + putDouble(rowId, (double) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + UTF8String v = (UTF8String) value; + putByteArray(rowId, v.getBytes(), offset); + } else if (t instanceof DecimalType) { + DecimalType dt = (DecimalType) t; + Decimal d = Decimal.fromDecimal(value); + if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { + putInt(rowId, (int) d.toUnscaledLong(), offset); + } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { + putLong(rowId, d.toUnscaledLong(), offset); + } else { + final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); + byte[] bytes = integer.toByteArray(); + putByteArray(rowId, bytes, 0, bytes.length, offset); + } + } else if (t instanceof CalendarIntervalType) { + CalendarInterval c = (CalendarInterval) value; + columnVectors[offset].getChild(0).putInt(rowId, c.months); + columnVectors[offset].getChild(1).putLong(rowId, c.microseconds); + } else if (t instanceof org.apache.spark.sql.types.DateType) { + putInt(rowId, (int) value, offset); + } else if (t instanceof org.apache.spark.sql.types.TimestampType) { + putLong(rowId, (long) value, offset); + } + } + } + + public void putBoolean(int rowId, boolean value, int ordinal) { + columnVectors[ordinal].putBoolean(rowId, (boolean) value); + } + + public void putByte(int rowId, byte value, int ordinal) { + columnVectors[ordinal].putByte(rowId, (byte) value); + } + + public void putShort(int rowId, short value, int ordinal) { + columnVectors[ordinal].putShort(rowId, (short) value); + } + + public void putInt(int rowId, int value, int ordinal) { + columnVectors[ordinal].putInt(rowId, (int) value); + } + + public void putDictionaryInt(int rowId, int value, int ordinal) { + columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value); + } + + public void putFloat(int rowId, float value, int ordinal) { + columnVectors[ordinal].putFloat(rowId, (float) value); + } + + public void putLong(int rowId, long value, int ordinal) { + columnVectors[ordinal].putLong(rowId, (long) value); + } + + public void putDouble(int rowId, double value, int ordinal) { + columnVectors[ordinal].putDouble(rowId, (double) value); + } + + public void putByteArray(int rowId, byte[] value, int ordinal) { + columnVectors[ordinal].putByteArray(rowId, (byte[]) value); + } + + public void putInts(int rowId, int count, int value, int ordinal) { + columnVectors[ordinal].putInts(rowId, count, value); + } + + public void putShorts(int rowId, int count, short value, int ordinal) { + columnVectors[ordinal].putShorts(rowId, count, value); + } + + public void putLongs(int rowId, int count, long value, int ordinal) { + columnVectors[ordinal].putLongs(rowId, count, value); + } + + public void putDecimal(int rowId, Decimal value, int precision, int ordinal) { + columnVectors[ordinal].putDecimal(rowId, value, precision); + + } + + public void putDoubles(int rowId, int count, double value, int ordinal) { + columnVectors[ordinal].putDoubles(rowId, count, value); + } + + public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) { + columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length); + } + + public void putNull(int rowId, int ordinal) { + columnVectors[ordinal].putNull(rowId); + } + + public void putNulls(int rowId, int count, int ordinal) { + columnVectors[ordinal].putNulls(rowId, count); + } + + public void putNotNull(int rowId, int ordinal) { + columnVectors[ordinal].putNotNull(rowId); + } + + public void putNotNulls(int rowId, int count, int ordinal) { + columnVectors[ordinal].putNotNulls(rowId, count); + } + + public boolean isNullAt(int rowId, int ordinal) { + return columnVectors[ordinal].isNullAt(rowId); + } + + public boolean hasDictionary(int ordinal) { + return columnVectors[ordinal].hasDictionary(); + } + + public void setDictionary(Object dictionary, int ordinal) { --- End diff -- Please make sure we set correct dictionary. 2.3 does not expect org.apache.parquet.column.Dictionary --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213689912 --- Diff: integration/spark2/pom.xml --- @@ -276,6 +312,8 @@ <configuration> <excludes> <exclude>src/main/spark2.1</exclude> + <exclude>src/main/spark2.3</exclude> + <exclude>src/main/commonTo2.2And2.3</exclude> --- End diff -- I think here no need to include --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213695426 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -700,3 +724,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } } + +class CarbonPhysicalPlanException extends Exception { + +} --- End diff -- remove the empty braces --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2642 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8148/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213700919 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.{DataType, StringType} + +/** + * Custom expression to override the deterministic property . + */ +case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{ --- End diff -- I don't see any differences , why it is copied 3 times? --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2642 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/77/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2642 Please check the build failures of 2.3 CI http://136.243.101.176:8080/job/ManualApacheCarbonPRBuilder2.1/172/ --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213902360 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -122,7 +122,7 @@ class CarbonAppendableStreamSink( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = fileLogPath, - isAppend = false) + false) --- End diff -- in 2.3 isAppend name is changed to dynamicPartitionOverwrite --- |
In reply to this post by qiuchenjian-2
Github user sandeep-katta commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213903286 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/CustomDeterministicExpression.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.{DataType, StringType} + +/** + * Custom expression to override the deterministic property . + */ +case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{ --- End diff -- in 2.1 and 2.2 override def deterministic: Boolean = true in 2.3 override lazy val deterministic: Boolean = true --- |
In reply to this post by qiuchenjian-2
Github user sujith71955 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2642#discussion_r213905161 --- Diff: integration/spark-datasource/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.Dictionary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + */ +public class CarbonVectorProxy { + + private ColumnarBatch columnarBatch; + private WritableColumnVector[] columnVectors; + + /** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + * + * @param memMode which represent the type onheap or offheap vector. + * @param rowNum rows number for vector reading + * @param structFileds, metadata related to current schema of table. + */ + public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { + columnVectors = ColumnVectorFactory + .getColumnVector(memMode, new StructType(structFileds), rowNum); + columnarBatch = new ColumnarBatch(columnVectors); + columnarBatch.setNumRows(rowNum); + } + + public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { + columnVectors = ColumnVectorFactory + .getColumnVector(memMode, outputSchema, rowNum); + columnarBatch = new ColumnarBatch(columnVectors); + columnarBatch.setNumRows(rowNum); + } + + /** + * Returns the number of rows for read, including filtered rows. + */ + public int numRows() { + return columnarBatch.numRows(); + } + + public Object reserveDictionaryIds(int capacity, int ordinal) { + return columnVectors[ordinal].reserveDictionaryIds(capacity); + } + + /** + * This API will return a columnvector from a batch of column vector rows + * based on the ordinal + * + * @param ordinal + * @return + */ + public WritableColumnVector column(int ordinal) { + return (WritableColumnVector) columnarBatch.column(ordinal); + } + + public WritableColumnVector getColumnVector(int ordinal) { + return columnVectors[ordinal]; + } + + /** + * Resets this column for writing. The currently stored values are no longer accessible. + */ + public void reset() { + for (WritableColumnVector col : columnVectors) { + col.reset(); + } + } + + public void resetDictionaryIds(int ordinal) { + columnVectors[ordinal].getDictionaryIds().reset(); + } + + /** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ + public InternalRow getRow(int rowId) { + return columnarBatch.getRow(rowId); + } + + + /** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ + public Object getColumnarBatch() { + return columnarBatch; + } + + /** + * Called to close all the columns in this batch. It is not valid to access the data after + * calling this. This must be called at the end to clean up memory allocations. + */ + public void close() { + columnarBatch.close(); + } + + /** + * Sets the number of rows in this batch. + */ + public void setNumRows(int numRows) { + columnarBatch.setNumRows(numRows); + } + + public void putRowToColumnBatch(int rowId, Object value, int offset) { + org.apache.spark.sql.types.DataType t = dataType(offset); + if (null == value) { + putNull(rowId, offset); + } else { + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + putBoolean(rowId, (boolean) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + putByte(rowId, (byte) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + putShort(rowId, (short) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + putInt(rowId, (int) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + putLong(rowId, (long) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + putFloat(rowId, (float) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + putDouble(rowId, (double) value, offset); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + UTF8String v = (UTF8String) value; + putByteArray(rowId, v.getBytes(), offset); + } else if (t instanceof DecimalType) { + DecimalType dt = (DecimalType) t; + Decimal d = Decimal.fromDecimal(value); + if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { + putInt(rowId, (int) d.toUnscaledLong(), offset); + } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { + putLong(rowId, d.toUnscaledLong(), offset); + } else { + final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); + byte[] bytes = integer.toByteArray(); + putByteArray(rowId, bytes, 0, bytes.length, offset); + } + } else if (t instanceof CalendarIntervalType) { + CalendarInterval c = (CalendarInterval) value; + columnVectors[offset].getChild(0).putInt(rowId, c.months); + columnVectors[offset].getChild(1).putLong(rowId, c.microseconds); + } else if (t instanceof org.apache.spark.sql.types.DateType) { + putInt(rowId, (int) value, offset); + } else if (t instanceof org.apache.spark.sql.types.TimestampType) { + putLong(rowId, (long) value, offset); + } + } + } + + public void putBoolean(int rowId, boolean value, int ordinal) { + columnVectors[ordinal].putBoolean(rowId, (boolean) value); + } + + public void putByte(int rowId, byte value, int ordinal) { + columnVectors[ordinal].putByte(rowId, (byte) value); + } + + public void putShort(int rowId, short value, int ordinal) { + columnVectors[ordinal].putShort(rowId, (short) value); + } + + public void putInt(int rowId, int value, int ordinal) { + columnVectors[ordinal].putInt(rowId, (int) value); + } + + public void putDictionaryInt(int rowId, int value, int ordinal) { + columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value); + } + + public void putFloat(int rowId, float value, int ordinal) { + columnVectors[ordinal].putFloat(rowId, (float) value); + } + + public void putLong(int rowId, long value, int ordinal) { + columnVectors[ordinal].putLong(rowId, (long) value); + } + + public void putDouble(int rowId, double value, int ordinal) { + columnVectors[ordinal].putDouble(rowId, (double) value); + } + + public void putByteArray(int rowId, byte[] value, int ordinal) { + columnVectors[ordinal].putByteArray(rowId, (byte[]) value); + } + + public void putInts(int rowId, int count, int value, int ordinal) { + columnVectors[ordinal].putInts(rowId, count, value); + } + + public void putShorts(int rowId, int count, short value, int ordinal) { + columnVectors[ordinal].putShorts(rowId, count, value); + } + + public void putLongs(int rowId, int count, long value, int ordinal) { + columnVectors[ordinal].putLongs(rowId, count, value); + } + + public void putDecimal(int rowId, Decimal value, int precision, int ordinal) { + columnVectors[ordinal].putDecimal(rowId, value, precision); + + } + + public void putDoubles(int rowId, int count, double value, int ordinal) { + columnVectors[ordinal].putDoubles(rowId, count, value); + } + + public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) { + columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length); + } + + public void putNull(int rowId, int ordinal) { + columnVectors[ordinal].putNull(rowId); + } + + public void putNulls(int rowId, int count, int ordinal) { + columnVectors[ordinal].putNulls(rowId, count); + } + + public void putNotNull(int rowId, int ordinal) { + columnVectors[ordinal].putNotNull(rowId); + } + + public void putNotNulls(int rowId, int count, int ordinal) { + columnVectors[ordinal].putNotNulls(rowId, count); + } + + public boolean isNullAt(int rowId, int ordinal) { + return columnVectors[ordinal].isNullAt(rowId); + } + + public boolean hasDictionary(int ordinal) { + return columnVectors[ordinal].hasDictionary(); + } + + public void setDictionary(Object dictionary, int ordinal) { --- End diff -- Yes, proxy layer corresponding to the spark 2.3 and spark 2.1&2.2 is setting the dictionary instance --- |
Free forum by Nabble | Edit this page |