GitHub user sounakr opened a pull request:
https://github.com/apache/carbondata/pull/1995 [WIP] File Format Reader Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sounakr/incubator-carbondata store Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1995.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1995 ---- commit 9f547aecbdac5cc2917bfbfa64a3a3c6100b7103 Author: sounakr <sounakr@...> Date: 2018-02-24T02:25:14Z File Format Reader ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2613/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1995 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3640/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3858/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1995#discussion_r170408142 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,940 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.TableDataMap; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.mutate.data.BlockMappingVO; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; + +/** + * Input format of CarbonData file. + * + * @param <T> + */ +public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable { --- End diff -- Use InterfaceAudience and InterfaceStability to annotation all newly added public class Please follow this in the future --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1995#discussion_r170408178 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala --- @@ -30,14 +30,14 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll { override def beforeAll(): Unit = { sql("DROP TABLE IF EXISTS origin") // create carbon table and insert data - sql("CREATE TABLE origin(key INT, value STRING) STORED BY 'carbondata'") - sql("INSERT INTO origin select 100,'spark'") - sql("INSERT INTO origin select 200,'hive'") + sql("CREATE TABLE origin(name String, age int) STORED BY 'carbondata'") --- End diff -- why this is modified? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1995#discussion_r170408254 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,940 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.TableDataMap; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.mutate.data.BlockMappingVO; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; + +/** + * Input format of CarbonData file. + * + * @param <T> + */ +public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable { --- End diff -- I think you can reuse some function in CarbonTableInputFormat, can you make a parent class and extend it? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1995#discussion_r170408310 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileLevelFormat.scala --- @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.util + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} +import org.apache.parquet.hadoop.codec.CodecConfig +import org.apache.spark.{TaskContext, TaskKilledException} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, UnsafeRow} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.{ParquetLogRedirector, ParquetOutputWriter, ParquetReadSupport, VectorizedParquetRecordReader} +import org.apache.spark.sql.execution.datasources.text.{TextOptions, TextOutputWriter} +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter, RelationProvider} +import org.apache.spark.sql.types.{AtomicType, IntegerType, StructField, StructType} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, TaskMetricsMap, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat, DataMapJob} +import org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.rdd.{CarbonSparkPartition, SparkDataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + + +class CarbonFileLevelFormat extends FileFormat --- End diff -- How about the current `CarbonFileFormat`, is it renamed to `CarbonTableLevelFormat`? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1995#discussion_r170408521 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala --- @@ -62,6 +63,31 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll { assert(new File(originDataPath).exists()) } + + test("create external table with existing files2") { --- End diff -- Suggest to consider more testcase in this PR or other PR: 1. use SDK to write data and read by CarbonFileInputFormat through mapreduce 2. use SDK to write data and read by CarbonFileInputFormat through spark sql 3. use SDK to write data without schema persistence, and read by CarbonFileInputFormat through spark sql 4. use SDK to write data to same folder multiple times, without schema persistence, and read by CarbonFileInputFormat through spark sql (to mimic multiple load) 5. different sorting configuration of 1-4 6. use CarbonTableOutpuFormat to write data, and read by CarbonFileInputFormat using the segment folder 7. For the partition related test, maybe need to added after #1984 is merged --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1995#discussion_r170412437 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -353,7 +353,53 @@ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configur List<String> validSegments = segments.getValidSegments(); streamSegments = segments.getStreamSegments(); if (validSegments.size() == 0) { - return getSplitsOfStreaming(job, identifier, streamSegments); + if (streamSegments.size() != 0) { + return getSplitsOfStreaming(job, identifier, streamSegments); + } + // check for externalTable segment (Segment_null) + { + // process and resolve the expression + Expression filter = getFilterPredicates(job.getConfiguration()); + TableProvider tableProvider = new SingleTableProvider(carbonTable); + // this will be null in case of corrupt schema file. + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); + + // prune partitions for filter query on partition table --- End diff -- This code changes are not required. Because we already handle this logic in **CarbonFileInputFormat** This logic is to get the splits file level. But this is a table level reader. There should not be any change in this class for this Requirement. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3902/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2657/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1995 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3680/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2677/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3921/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2678/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3922/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2682/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1995 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3927/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1995 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3695/ --- |
Free forum by Nabble | Edit this page |