Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6336/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6709/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7985/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6340/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7989/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250447 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) { * @param path carbon file path */ public LatestFilesReadCommittedScope(String path) { - this(path, null); + this(path, (String) null); + } + + /** + * a new constructor with path + * + * @param path carbon file path + */ + public LatestFilesReadCommittedScope(String path, String[] subFolders) { + Objects.requireNonNull(path); + this.carbonFilePath = path; + this.subFolders = subFolders; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + throw new RuntimeException("Error while taking index snapshot", ex); --- End diff -- Not required handling here, so removed. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250488 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -36,12 +36,14 @@ */ @InterfaceAudience.Internal @InterfaceStability.Stable -public class LatestFilesReadCommittedScope implements ReadCommittedScope { +public class LatestFilesReadCommittedScope + implements ReadCommittedScope { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250514 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) { * @param path carbon file path */ public LatestFilesReadCommittedScope(String path) { - this(path, null); + this(path, (String) null); + } + + /** + * a new constructor with path + * + * @param path carbon file path + */ --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250560 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) { * @param path carbon file path */ public LatestFilesReadCommittedScope(String path) { - this(path, null); + this(path, (String) null); + } + + /** + * a new constructor with path + * + * @param path carbon file path + */ + public LatestFilesReadCommittedScope(String path, String[] subFolders) { + Objects.requireNonNull(path); + this.carbonFilePath = path; + this.subFolders = subFolders; --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6713/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6349/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7997/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6351/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212305587 --- Diff: integration/spark-datasource/pom.xml --- @@ -0,0 +1,279 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.5.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-spark-datasource</artifactId> + <name>Apache CarbonData :: Spark Datasource</name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + <jacoco.append>true</jacoco.append> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-hadoop</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-store-sdk</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <testSourceDirectory>src/test/scala</testSourceDirectory> + <resources> + <resource> + <directory>src/resources</directory> + </resource> + <resource> + <directory>.</directory> + <includes> + <include>CARBON_SPARK_INTERFACELogResource.properties</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + <!-- Note config is repeated in scalatest config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> + </systemProperties> + <failIfNoTests>false</failIfNoTests> + </configuration> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <!-- Note config is repeated in surefire config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>CarbonTestSuite.txt</filereports> + <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <stderr /> + <environmentVariables> + </environmentVariables> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>build-all</id> + <properties> + <spark.version>2.2.1</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + </profile> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + <profile> + <id>spark-2.1</id> + <properties> + <spark.version>2.1.0</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/spark2.2</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/spark2.1</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>spark-2.2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <spark.version>2.2.1</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + <build> --- End diff -- I think profiles are not required --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212314571 --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.carbondata.execution.datasources + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.datatype +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField} +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} +import org.apache.carbondata.core.scan.expression.conditional._ +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema} + +object CarbonSparkDataSourceUtil { + + /** + * Convert from carbon datatype to sparks datatype + */ + def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = { + if (CarbonDataTypes.isDecimal(dataType)) { + DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision, + dataType.asInstanceOf[CarbonDecimalType].getScale) + } else { + dataType match { + case CarbonDataTypes.STRING => StringType + case CarbonDataTypes.SHORT => ShortType + case CarbonDataTypes.INT => IntegerType + case CarbonDataTypes.LONG => LongType + case CarbonDataTypes.DOUBLE => DoubleType + case CarbonDataTypes.BOOLEAN => BooleanType + case CarbonDataTypes.TIMESTAMP => TimestampType + case CarbonDataTypes.DATE => DateType + case CarbonDataTypes.VARCHAR => StringType + } + } + } + + /** + * Convert from sparks datatype to carbon datatype + */ + def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { + dataType match { + case StringType => CarbonDataTypes.STRING + case ShortType => CarbonDataTypes.SHORT + case IntegerType => CarbonDataTypes.INT + case LongType => CarbonDataTypes.LONG + case DoubleType => CarbonDataTypes.DOUBLE + case FloatType => CarbonDataTypes.FLOAT + case DateType => CarbonDataTypes.DATE + case BooleanType => CarbonDataTypes.BOOLEAN + case TimestampType => CarbonDataTypes.TIMESTAMP + case ArrayType(elementType, _) => + CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType)) + case StructType(fields) => + val carbonFields = new java.util.ArrayList[CarbonStructField] + fields.map { field => + carbonFields.add( + new CarbonStructField( + field.name, + convertSparkToCarbonDataType(field.dataType))) + } + CarbonDataTypes.createStructType(carbonFields) + case NullType => CarbonDataTypes.NULL + case decimal: DecimalType => + CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale) + case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark") + } + } + + /** + * Converts data sources filters to carbon filter predicates. + */ + def createCarbonFilter(schema: StructType, + predicate: sources.Filter): Option[CarbonExpression] = { + val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap + + def createFilter(predicate: sources.Filter): Option[CarbonExpression] = { + predicate match { + + case sources.EqualTo(name, value) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.Not(sources.EqualTo(name, value)) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.EqualNullSafe(name, value) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.Not(sources.EqualNullSafe(name, value)) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.GreaterThan(name, value) => + Some(new GreaterThanExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.LessThan(name, value) => + Some(new LessThanExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.GreaterThanOrEqual(name, value) => + Some(new GreaterThanEqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.LessThanOrEqual(name, value) => + Some(new LessThanEqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.In(name, values) => + if (values.length == 1 && values(0) == null) { + Some(new FalseExpression(getCarbonExpression(name))) + } else { + Some(new InExpression(getCarbonExpression(name), + new ListExpression( + convertToJavaList(values.filterNot(_ == null) + .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList)))) + } + case sources.Not(sources.In(name, values)) => + if (values.contains(null)) { + Some(new FalseExpression(getCarbonExpression(name))) + } else { + Some(new NotInExpression(getCarbonExpression(name), + new ListExpression( + convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) + } + case sources.IsNull(name) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.IsNotNull(name) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.And(lhs, rhs) => + (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) + case sources.Or(lhs, rhs) => + for { + lhsFilter <- createFilter(lhs) + rhsFilter <- createFilter(rhs) + } yield { + new OrExpression(lhsFilter, rhsFilter) + } + case sources.StringStartsWith(name, value) if value.length > 0 => + Some(new StartsWithExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case _ => None + } + } + + def getCarbonExpression(name: String) = { --- End diff -- Remove duplicate methods (CarbonFilters also has) --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212317536 --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.carbondata.execution.datasources + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.datatype +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField} +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} +import org.apache.carbondata.core.scan.expression.conditional._ +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema} + +object CarbonSparkDataSourceUtil { + + /** + * Convert from carbon datatype to sparks datatype + */ + def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = { + if (CarbonDataTypes.isDecimal(dataType)) { + DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision, + dataType.asInstanceOf[CarbonDecimalType].getScale) + } else { + dataType match { + case CarbonDataTypes.STRING => StringType + case CarbonDataTypes.SHORT => ShortType + case CarbonDataTypes.INT => IntegerType + case CarbonDataTypes.LONG => LongType + case CarbonDataTypes.DOUBLE => DoubleType + case CarbonDataTypes.BOOLEAN => BooleanType + case CarbonDataTypes.TIMESTAMP => TimestampType + case CarbonDataTypes.DATE => DateType + case CarbonDataTypes.VARCHAR => StringType + } + } + } + + /** + * Convert from sparks datatype to carbon datatype + */ + def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { + dataType match { + case StringType => CarbonDataTypes.STRING + case ShortType => CarbonDataTypes.SHORT + case IntegerType => CarbonDataTypes.INT + case LongType => CarbonDataTypes.LONG + case DoubleType => CarbonDataTypes.DOUBLE + case FloatType => CarbonDataTypes.FLOAT + case DateType => CarbonDataTypes.DATE + case BooleanType => CarbonDataTypes.BOOLEAN + case TimestampType => CarbonDataTypes.TIMESTAMP + case ArrayType(elementType, _) => + CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType)) + case StructType(fields) => + val carbonFields = new java.util.ArrayList[CarbonStructField] + fields.map { field => + carbonFields.add( + new CarbonStructField( + field.name, + convertSparkToCarbonDataType(field.dataType))) + } + CarbonDataTypes.createStructType(carbonFields) + case NullType => CarbonDataTypes.NULL + case decimal: DecimalType => + CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale) + case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark") + } + } + + /** + * Converts data sources filters to carbon filter predicates. + */ + def createCarbonFilter(schema: StructType, + predicate: sources.Filter): Option[CarbonExpression] = { + val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap + + def createFilter(predicate: sources.Filter): Option[CarbonExpression] = { + predicate match { + + case sources.EqualTo(name, value) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.Not(sources.EqualTo(name, value)) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.EqualNullSafe(name, value) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.Not(sources.EqualNullSafe(name, value)) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.GreaterThan(name, value) => + Some(new GreaterThanExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.LessThan(name, value) => + Some(new LessThanExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.GreaterThanOrEqual(name, value) => + Some(new GreaterThanEqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.LessThanOrEqual(name, value) => + Some(new LessThanEqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.In(name, values) => + if (values.length == 1 && values(0) == null) { + Some(new FalseExpression(getCarbonExpression(name))) + } else { + Some(new InExpression(getCarbonExpression(name), + new ListExpression( + convertToJavaList(values.filterNot(_ == null) + .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList)))) + } + case sources.Not(sources.In(name, values)) => + if (values.contains(null)) { + Some(new FalseExpression(getCarbonExpression(name))) + } else { + Some(new NotInExpression(getCarbonExpression(name), + new ListExpression( + convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) + } + case sources.IsNull(name) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.IsNotNull(name) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.And(lhs, rhs) => + (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) + case sources.Or(lhs, rhs) => + for { + lhsFilter <- createFilter(lhs) + rhsFilter <- createFilter(rhs) + } yield { + new OrExpression(lhsFilter, rhsFilter) + } + case sources.StringStartsWith(name, value) if value.length > 0 => + Some(new StartsWithExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case _ => None + } + } + + def getCarbonExpression(name: String) = { + new CarbonColumnExpression(name, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))) + } + + def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = { + val dataTypeOfAttribute = + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)) + val dataType = + if (Option(value).isDefined && + dataTypeOfAttribute == CarbonDataTypes.STRING && + value.isInstanceOf[Double]) { + CarbonDataTypes.DOUBLE + } else { + dataTypeOfAttribute + } + new CarbonLiteralExpression(value, dataType) + } + + createFilter(predicate) + } + + // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is + // not able find the classes inside scala list and gives ClassNotFoundException. + private def convertToJavaList( + scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = { + val javaList = new java.util.ArrayList[CarbonExpression]() + scalaList.foreach(javaList.add) + javaList + } + + /** + * Create load model for carbon + */ + def prepareLoadModel(options: Map[String, String], + dataSchema: StructType): CarbonLoadModel = { + val schema = new Schema(dataSchema.fields.map { field => + field.dataType match { + case s: StructType => + new Field(field.name, + field.dataType.typeName, + s.fields + .map(f => new datatype.StructField(f.name, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava) + case a: ArrayType => + new Field(field.name, + field.dataType.typeName, + Seq(new datatype.StructField(field.name, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava) + case other => + new Field(field.name, field.dataType.simpleString) + } + }) + val builder = new CarbonWriterBuilder + builder.isTransactionalTable(false) + builder.outputPath(options.getOrElse("path", "")) + val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt) + if (blockSize.isDefined) { + builder.withBlockSize(blockSize.get) + } + val blockletSize = options.get("table_blockletsize").map(_.toInt) + if (blockletSize.isDefined) { + builder.withBlockletSize(blockletSize.get) + } + builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, + CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean) + builder.localDictionaryThreshold( + options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, + CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt) + builder.sortBy( + options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull) + builder.isTransactionalTable(false) --- End diff -- duplicate line --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212318460 --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.carbondata.execution.datasources + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.datatype +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField} +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} +import org.apache.carbondata.core.scan.expression.conditional._ +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema} + +object CarbonSparkDataSourceUtil { + + /** + * Convert from carbon datatype to sparks datatype + */ + def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = { + if (CarbonDataTypes.isDecimal(dataType)) { + DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision, + dataType.asInstanceOf[CarbonDecimalType].getScale) + } else { + dataType match { + case CarbonDataTypes.STRING => StringType + case CarbonDataTypes.SHORT => ShortType + case CarbonDataTypes.INT => IntegerType + case CarbonDataTypes.LONG => LongType + case CarbonDataTypes.DOUBLE => DoubleType + case CarbonDataTypes.BOOLEAN => BooleanType + case CarbonDataTypes.TIMESTAMP => TimestampType + case CarbonDataTypes.DATE => DateType + case CarbonDataTypes.VARCHAR => StringType + } + } + } + + /** + * Convert from sparks datatype to carbon datatype + */ + def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { + dataType match { + case StringType => CarbonDataTypes.STRING + case ShortType => CarbonDataTypes.SHORT + case IntegerType => CarbonDataTypes.INT + case LongType => CarbonDataTypes.LONG + case DoubleType => CarbonDataTypes.DOUBLE + case FloatType => CarbonDataTypes.FLOAT + case DateType => CarbonDataTypes.DATE + case BooleanType => CarbonDataTypes.BOOLEAN + case TimestampType => CarbonDataTypes.TIMESTAMP + case ArrayType(elementType, _) => + CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType)) + case StructType(fields) => + val carbonFields = new java.util.ArrayList[CarbonStructField] + fields.map { field => + carbonFields.add( + new CarbonStructField( + field.name, + convertSparkToCarbonDataType(field.dataType))) + } + CarbonDataTypes.createStructType(carbonFields) + case NullType => CarbonDataTypes.NULL + case decimal: DecimalType => + CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale) + case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark") + } + } + + /** + * Converts data sources filters to carbon filter predicates. + */ + def createCarbonFilter(schema: StructType, + predicate: sources.Filter): Option[CarbonExpression] = { + val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap + + def createFilter(predicate: sources.Filter): Option[CarbonExpression] = { + predicate match { + + case sources.EqualTo(name, value) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.Not(sources.EqualTo(name, value)) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.EqualNullSafe(name, value) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.Not(sources.EqualNullSafe(name, value)) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.GreaterThan(name, value) => + Some(new GreaterThanExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.LessThan(name, value) => + Some(new LessThanExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.GreaterThanOrEqual(name, value) => + Some(new GreaterThanEqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.LessThanOrEqual(name, value) => + Some(new LessThanEqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case sources.In(name, values) => + if (values.length == 1 && values(0) == null) { + Some(new FalseExpression(getCarbonExpression(name))) + } else { + Some(new InExpression(getCarbonExpression(name), + new ListExpression( + convertToJavaList(values.filterNot(_ == null) + .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList)))) + } + case sources.Not(sources.In(name, values)) => + if (values.contains(null)) { + Some(new FalseExpression(getCarbonExpression(name))) + } else { + Some(new NotInExpression(getCarbonExpression(name), + new ListExpression( + convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) + } + case sources.IsNull(name) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.IsNotNull(name) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.And(lhs, rhs) => + (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) + case sources.Or(lhs, rhs) => + for { + lhsFilter <- createFilter(lhs) + rhsFilter <- createFilter(rhs) + } yield { + new OrExpression(lhsFilter, rhsFilter) + } + case sources.StringStartsWith(name, value) if value.length > 0 => + Some(new StartsWithExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, value))) + case _ => None + } + } + + def getCarbonExpression(name: String) = { + new CarbonColumnExpression(name, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))) + } + + def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = { + val dataTypeOfAttribute = + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)) + val dataType = + if (Option(value).isDefined && + dataTypeOfAttribute == CarbonDataTypes.STRING && + value.isInstanceOf[Double]) { + CarbonDataTypes.DOUBLE + } else { + dataTypeOfAttribute + } + new CarbonLiteralExpression(value, dataType) + } + + createFilter(predicate) + } + + // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is + // not able find the classes inside scala list and gives ClassNotFoundException. + private def convertToJavaList( + scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = { + val javaList = new java.util.ArrayList[CarbonExpression]() + scalaList.foreach(javaList.add) + javaList + } + + /** + * Create load model for carbon + */ + def prepareLoadModel(options: Map[String, String], + dataSchema: StructType): CarbonLoadModel = { + val schema = new Schema(dataSchema.fields.map { field => + field.dataType match { + case s: StructType => + new Field(field.name, + field.dataType.typeName, + s.fields + .map(f => new datatype.StructField(f.name, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava) + case a: ArrayType => + new Field(field.name, + field.dataType.typeName, + Seq(new datatype.StructField(field.name, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava) + case other => + new Field(field.name, field.dataType.simpleString) + } + }) + val builder = new CarbonWriterBuilder + builder.isTransactionalTable(false) + builder.outputPath(options.getOrElse("path", "")) + val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt) + if (blockSize.isDefined) { + builder.withBlockSize(blockSize.get) + } + val blockletSize = options.get("table_blockletsize").map(_.toInt) + if (blockletSize.isDefined) { + builder.withBlockletSize(blockletSize.get) + } + builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, + CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean) + builder.localDictionaryThreshold( + options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, + CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt) + builder.sortBy( + options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull) + builder.isTransactionalTable(false) + builder.uniqueIdentifier(System.currentTimeMillis()) --- End diff -- Support column_meta_cache, cache_level also --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6722/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7999/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212329045 --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala --- @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.carbondata.execution.datasources + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql._ +import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SparkTypeConverter +import org.apache.spark.util.SerializableConfiguration + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.BlockletDetailInfo +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion} +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression} +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable +import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject} +import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader + +/** + * Used to read and write data stored in carbondata files to/from the spark execution engine. + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +class SparkCarbonFileFormat extends FileFormat + with DataSourceRegister + with Logging + with Serializable { + + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + /** + * If user does not provide schema while reading the data then spark calls this method to infer + * schema from the carbodata files. It reads the schema present in carbondata files and return it. + */ + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val tablePath = options.get("path") match { + case Some(path) => path + case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString) + } + + val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false) + val table = CarbonTable.buildFromTableInfo(tableInfo) + var schema = new StructType + tableInfo.getFactTable.getListOfColumns.asScala.foreach { col => + // TODO find better way to know its a child + if (!col.getColumnName.contains(".")) { + schema = schema.add( + col.getColumnName, + SparkTypeConverter.convertCarbonToSparkDataType(col, table)) + } + } + Some(schema) + } + + + /** + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation is + * done here. + */ + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + val conf = job.getConfiguration + + val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema) + model.setLoadWithoutConverterStep(true) + CarbonTableOutputFormat.setLoadModel(conf, model) + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) { + new Path(path).getParent.toString + } else { + path + } + context.getConfiguration.set("carbon.outputformat.writepath", updatedPath) + context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "") + new CarbonOutputWriter(path, context, dataSchema.fields) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CarbonTablePath.CARBON_DATA_EXT + } + } + } + + /** + * It is a just class to make compile between spark 2.1 and 2.2 + */ + private trait AbstractCarbonOutputWriter { + def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") + def writeInternal(row: InternalRow): Unit = { + writeCarbon(row) + } + def write(row: InternalRow): Unit = { + writeCarbon(row) + } + def writeCarbon(row: InternalRow): Unit + } + + + /** + * Writer class for carbondata files + */ + private class CarbonOutputWriter(path: String, + context: TaskAttemptContext, + fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter { + + private val writable = new ObjectArrayWritable + + private val cutOffDate = Integer.MAX_VALUE >> 1 + + private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] = + new CarbonTableOutputFormat().getRecordWriter(context) + + /** + * Write sparks internal row to carbondata record writer + */ + def writeCarbon(row: InternalRow): Unit = { + val data: Array[AnyRef] = extractData(row, fieldTypes) + writable.set(data) + recordWriter.write(NullWritable.get(), writable) + } + + override def writeInternal(row: InternalRow): Unit = { + writeCarbon(row) + } + + /** + * Convert the internal row to carbondata understandable object + */ + private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = { + val data = new Array[AnyRef](fieldTypes.length) + var i = 0 + while (i < fieldTypes.length) { + if (!row.isNullAt(i)) { + fieldTypes(i).dataType match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case s: StructType => + data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields)) + case s: ArrayType => + data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType)) + case d: DateType => + data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef] + case d: TimestampType => + data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef] + case other => + data(i) = row.get(i, other) + } + } else { + setNull(fieldTypes(i).dataType, data, i) + } + i += 1 + } + data + } + + private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = { + dataType match { + case d: DateType => + // 1 as treated as null in carbon + data(i) = 1.asInstanceOf[AnyRef] + case _ => + } + } + + /** + * Convert the internal row to carbondata understandable object + */ + private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = { + val data = new Array[AnyRef](row.numElements()) + var i = 0 + while (i < data.length) { + if (!row.isNullAt(i)) { + dataType match { + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case s: StructType => + data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields)) + case s: ArrayType => + data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType)) + case d: DateType => + data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef] + case other => data(i) = row.get(i, dataType) + } + } else { + setNull(dataType, data, i) + } + i += 1 + } + data + } + + override def close(): Unit = { + recordWriter.close(context) + } + } + + override def shortName(): String = "carbon" + + override def toString: String = "carbon" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat] + + /** + * Whether to support vector reader while reading data. + * In case of complex types it is not required to support it + */ + private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = { + val vectorizedReader = { + if (sparkSession.sqlContext.sparkSession.conf + .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) { + sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER) + } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) { + System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) + } else { + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + } + vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType]) + } + + + /** + * Returns whether this format support returning columnar batch or not. + */ + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + val conf = sparkSession.sessionState.conf + conf.wholeStageEnabled && + schema.length <= conf.wholeStageMaxNumFields && + schema.forall(_.dataType.isInstanceOf[AtomicType]) + } + + /** + * Returns a function that can be used to read a single carbondata file in as an + * Iterator of InternalRow. + */ + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val filter: Option[CarbonExpression] = filters.flatMap { filter => + CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter) + }.reduceOption(new AndExpression(_, _)) + + val projection = requiredSchema.map(_.name).toArray + val carbonProjection = new CarbonProjection + projection.foreach(carbonProjection.addColumn) + + var supportBatchValue: Boolean = false + + val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val readVector = supportVector(sparkSession, resultSchema) + if (readVector) { + supportBatchValue = supportBatch(sparkSession, resultSchema) + } + val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema) + CarbonInputFormat + .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) + CarbonInputFormat.setTransactionalTable(hadoopConf, false) + CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection) + filter match { + case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c) + case None => None + } + val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object] + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + file: PartitionedFile => { + assert(file.partitionValues.numFields == partitionSchema.size) + + if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) || --- End diff -- Can we use just check with carbondata extn? --- |
Free forum by Nabble | Edit this page |