Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174212055 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileLevelFormat.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +class CarbonFileLevelFormat extends FileFormat + with DataSourceRegister + with Logging + with Serializable { + + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val filePaths = CarbonUtil.getFilePathExternalFilePath( + options.get("path").get) + if (filePaths.size() == 0) { + throw new SparkException("CarbonData file is not present in the location mentioned in DDL") + } + val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0)) + val fileHeader = carbonHeaderReader.readHeader + val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader + .getColumn_schema + var colArray = ArrayBuffer[StructField]() + for (i <- 0 to table_columns.size() - 1) { + val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)) + colArray += (new StructField(col.getColumnName, + CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false)) + } + colArray.+:(Nil) + + Some(StructType(colArray)) + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new TextOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CarbonTablePath.CARBON_DATA_EXT + } + } + } + + override def shortName(): String = "CarbonDataFileFormat" --- End diff -- change the shortName to `carbonfile`. So, `carbondata` is for table level and `carbonfile` is for file level --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174212571 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileLevelFormat.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +class CarbonFileLevelFormat extends FileFormat --- End diff -- Please rename current `CarbonFileFormat` class to `CarbonTableLevelFormat` --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2055 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3017/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2055 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4261/ --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on the issue:
https://github.com/apache/carbondata/pull/2055 Retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2055 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3114/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174667420 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; + +/** + * Input format of CarbonData file. + * + * @param <T> + */ +public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable { --- End diff -- ok. Fixed --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2055 @jackylk : All the review comments have been addressed and again rebased with master. Please check the below commit. https://github.com/apache/carbondata/pull/2055/commits/b530adf2071113a75bc8e982ea3bd934e971b650 InferSchema, just renaming done. cannot make it to table path level now as it dependent on identifier. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2055 retest this please... cannot see spark 2.1 report. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2055 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4348/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174689415 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + // create carbon table and insert data + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //new provider Carbonfile + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + sql("Describe formatted sdkOutputTable").show(false) --- End diff -- can you assert the result instead of show --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174690041 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + // create carbon table and insert data + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //new provider Carbonfile + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200, false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false) + + sql("select count(*) from sdkOutputTable").show(200, false) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("should not allow to alter datasource carbontable ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + val exception = intercept[MalformedCarbonCommandException] + { + sql("Alter table sdkOutputTable change age age BIGINT") + } + assert(exception.getMessage() + .contains("Unsupported alter operation on Carbon external fileformat table")) --- End diff -- Change to `Unsupported alter operation on 'carbonfile' table` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174690144 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + // create carbon table and insert data + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //new provider Carbonfile + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200, false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false) + + sql("select count(*) from sdkOutputTable").show(200, false) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("should not allow to alter datasource carbontable ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + val exception = intercept[MalformedCarbonCommandException] + { + sql("Alter table sdkOutputTable change age age BIGINT") + } + assert(exception.getMessage() + .contains("Unsupported alter operation on Carbon external fileformat table")) --- End diff -- Please use term `'carbonfile'` instead of external table --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174690840 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + // create carbon table and insert data + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //new provider Carbonfile + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200, false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false) + + sql("select count(*) from sdkOutputTable").show(200, false) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("should not allow to alter datasource carbontable ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + val exception = intercept[MalformedCarbonCommandException] + { + sql("Alter table sdkOutputTable change age age BIGINT") + } + assert(exception.getMessage() + .contains("Unsupported alter operation on Carbon external fileformat table")) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without index file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + //org.apache.spark.SparkException: Index file not present to read the carbondata file + val exception = intercept[java.lang.RuntimeException] + { + sql("select * from sdkOutputTable").show(false) + } + assert(exception.getMessage().contains("Index file not present to read the carbondata file")) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + + test("Read sdk writer output file without Carbondata file should fail") { --- End diff -- I think read a table without index file should not fail. Like in case the CarbonWriter is using NO_SORT scope, in future it maybe not writing the index file. In that case, we should still be able to query on that path. I think better to create another PR for this requirement. Please raise another JIRA for this, and put a TODO here --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174691411 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + +class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll { + + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + val filePath = writerPath + "/Fact/Part0/Segment_null/" + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData(false) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false) --- End diff -- please change all show to assert --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174691560 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + +class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll { + + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + val filePath = writerPath + "/Fact/Part0/Segment_null/" + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData(false) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200,false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false) + + sql("select count(*) from sdkOutputTable").show(200,false) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + + + test("should not allow to alter datasource carbontable ") { --- End diff -- all these testcases are duplicated, can you move them to a common class and use it in 3 test suites --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174693583 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +@InterfaceAudience.User +@InterfaceStability.Evolving +class SparkCarbonFileFormat extends FileFormat + with DataSourceRegister + with Logging + with Serializable { + + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val filePaths = CarbonUtil.getFilePathExternalFilePath( + options.get("path").get) + if (filePaths.size() == 0) { + throw new SparkException("CarbonData file is not present in the location mentioned in DDL") + } + val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0)) + val fileHeader = carbonHeaderReader.readHeader + val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader + .getColumn_schema + var colArray = ArrayBuffer[StructField]() + for (i <- 0 to table_columns.size() - 1) { + val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)) + colArray += (new StructField(col.getColumnName, + CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false)) + } + colArray.+:(Nil) + + Some(StructType(colArray)) + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new TextOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CarbonTablePath.CARBON_DATA_EXT + } + } + } + + override def shortName(): String = "Carbonfile" --- End diff -- make it non-capital: `carbonfile` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174693784 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; + +/** + * Input format of CarbonData file. + * + * @param <T> + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable { --- End diff -- I think there are many duplicate code in `CarbonFileInputFormat` and `CarbonTableInputFormat`, can you make super class called `CarbonInputFormat` and move all duplicate code to it. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174694029 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -826,6 +826,12 @@ public boolean isExternalTable() { return external != null && external.equalsIgnoreCase("true"); } + public boolean isFileLevelExternalTable() { --- End diff -- Do not call it external, just change to `isFileLevelFormat` is ok --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2055#discussion_r174694130 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -2068,6 +2079,202 @@ private static void updateDecimalType(TableInfo tableInfo) { return tableInfo; } + public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema( + org.apache.carbondata.format.ColumnSchema externalColumnSchema) { + ColumnSchema wrapperColumnSchema = new ColumnSchema(); + wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id()); + wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name()); + wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar()); + DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type); + if (DataTypes.isDecimal(dataType)) { + DecimalType decimalType = (DecimalType) dataType; + decimalType.setPrecision(externalColumnSchema.getPrecision()); + decimalType.setScale(externalColumnSchema.getScale()); + } + wrapperColumnSchema.setDataType(dataType); + wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension()); + List<Encoding> encoders = new ArrayList<Encoding>(); + for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) { + encoders.add(fromExternalToWrapperEncoding(encoder)); + } + wrapperColumnSchema.setEncodingList(encoders); + wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child()); + wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision()); + wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id()); + wrapperColumnSchema.setScale(externalColumnSchema.getScale()); + wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value()); + wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal()); + Map<String, String> properties = externalColumnSchema.getColumnProperties(); + if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { + wrapperColumnSchema.setSortColumn(true); + } + } + wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function()); + List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation = + externalColumnSchema.getParentColumnTableRelations(); + if (null != parentColumnTableRelation) { + wrapperColumnSchema.setParentColumnTableRelations( + fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation)); + } + return wrapperColumnSchema; + } + + static List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations( + List<org.apache.carbondata.format.ParentColumnTableRelation> thirftParentColumnRelation) { + List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>(); + for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation : + thirftParentColumnRelation) { + RelationIdentifier relationIdentifier = + new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(), + carbonTableRelation.getRelationIdentifier().getTableName(), + carbonTableRelation.getRelationIdentifier().getTableId()); + ParentColumnTableRelation parentColumnTableRelation = + new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(), + carbonTableRelation.getColumnName()); + parentColumnTableRelationList.add(parentColumnTableRelation); + } + return parentColumnTableRelationList; + } + + static Encoding fromExternalToWrapperEncoding( + org.apache.carbondata.format.Encoding encoderThrift) { + switch (encoderThrift) { + case DICTIONARY: + return Encoding.DICTIONARY; + case DELTA: + return Encoding.DELTA; + case RLE: + return Encoding.RLE; + case INVERTED_INDEX: + return Encoding.INVERTED_INDEX; + case BIT_PACKED: + return Encoding.BIT_PACKED; + case DIRECT_DICTIONARY: + return Encoding.DIRECT_DICTIONARY; + default: + throw new IllegalArgumentException(encoderThrift.toString() + " is not supported"); + } + } + + static DataType thriftDataTyopeToWrapperDataType( + org.apache.carbondata.format.DataType dataTypeThrift) { + switch (dataTypeThrift) { + case BOOLEAN: + return DataTypes.BOOLEAN; + case STRING: + return DataTypes.STRING; + case SHORT: + return DataTypes.SHORT; + case INT: + return DataTypes.INT; + case LONG: + return DataTypes.LONG; + case DOUBLE: + return DataTypes.DOUBLE; + case DECIMAL: + return DataTypes.createDefaultDecimalType(); + case DATE: + return DataTypes.DATE; + case TIMESTAMP: + return DataTypes.TIMESTAMP; + case ARRAY: + return DataTypes.createDefaultArrayType(); + case STRUCT: + return DataTypes.createDefaultStructType(); + default: + return DataTypes.STRING; + } + } + + public static List<String> getFilePathExternalFilePath(String path) { + + // return the list of carbondata files in the given path. + CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); + CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + + if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { + return true; + } + return false; + } + }); + List<String> filePaths = new ArrayList<>(dataFiles.length); + for (CarbonFile dfiles : dataFiles) { + filePaths.add(dfiles.getAbsolutePath()); + } + return filePaths; + } + + /** + * This method will read the schema file from a given path + * + * @param schemaFilePath + * @return + */ + public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable( --- End diff -- rename to `inferSchema` --- |
Free forum by Nabble | Edit this page |