GitHub user shivamasn opened a pull request:
https://github.com/apache/carbondata/pull/2951 [OpenSource SDV] Add datasource testcases for DLI Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shivamasn/carbondata master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2951.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2951 ---- commit 2f1db6c1ce85e288f875a3cd4d4c5d199279ab69 Author: shivamasn <shivamasn17@...> Date: 2018-11-26T10:34:08Z Add datasource testcases for DLI ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2951 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2951 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r236646395 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/dli/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.dli + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + + + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + CarbonProperties.getInstance() --- End diff -- Remove this line --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r236646305 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/dli/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.dli + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + --- End diff -- Dont leave blank lines --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r236647192 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/dli/SparkCarbonDataSourceTestCase.scala --- @@ -0,0 +1,1267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.cluster.sdv.generated.dli + + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream} + --- End diff -- Remove extra lines --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r236646951 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/dli/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.dli + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil + +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + + + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, + CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT) + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, but the code expects /. + writerPath = writerPath.replace("\\", "/"); + + def buildTestData(): Any = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case _: Throwable => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO + } + + sql("Describe formatted sdkOutputTable").show(false) --- End diff -- No need to use .show(). All these queries can be removed --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r236647791 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/dli/SparkCarbonDataSourceTestCase.scala --- @@ -0,0 +1,1267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.cluster.sdv.generated.dli + + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream} + + + +import scala.collection.mutable + +import org.apache.avro +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord} +import org.apache.avro.io.{DecoderFactory, Encoder} +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.common.util.QueryTest +import org.junit.Assert +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} +import org.apache.carbondata.hadoop.testutil.StoreCreator +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} + +class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + + val warehouse1 = FileFactory.getPath(s"$rootPath/integration/spark-datasource/target/warehouse").toString + + test("test write using dataframe") { + import sqlContext.implicits._ + val df = sqlContext.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("drop table if exists testformat") + // Saves dataframe to carbon file + df.write + .format("carbon").saveAsTable("testformat") + assert(sql("select * from testformat").count() == 10) + assert(sql("select * from testformat where c1='a0'").count() == 1) + assert(sql("select * from testformat").count() == 10) + sql("drop table if exists testformat") + } + + test("test write using ddl") { + import sqlContext.implicits._ + val df = sqlContext.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("drop table if exists testparquet") + sql("drop table if exists testformat") + // Saves dataframe to carbon file + df.write + .format("parquet").saveAsTable("testparquet") + sql("create table carbon_table(c1 string, c2 string, number int) using carbon") + sql("insert into carbon_table select * from testparquet") + checkAnswer(sql("select * from carbon_table where c1='a1'"), sql("select * from testparquet where c1='a1'")) + if (!sqlContext.sparkContext.version.startsWith("2.1")) { + val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size() + DataMapStoreManager.getInstance() + .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table")) + assert(mapSize >= DataMapStoreManager.getInstance().getAllDataMaps.size()) + } + sql("drop table if exists testparquet") + sql("drop table if exists testformat") + } + + test("test read with df write") { + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder")) + import sqlContext.implicits._ + val df = sqlContext.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + + // Saves dataframe to carbon file + df.write.format("carbon").save(warehouse1 + "/test_folder/") + + val frame = sqlContext.read.format("carbon").load(warehouse1 + "/test_folder") + frame.show() --- End diff -- remove all .show calls --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r237779383 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.datasource + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} +import scala.collection.JavaConverters._ +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, but the code expects /. + writerPath = writerPath.replace("\\", "/"); + + def buildTestData(): Any = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case _: Throwable => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else { + // TO DO --- End diff -- Need to handle for spark 2.3 also --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r237780471 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.datasource --- End diff -- Please format all the newly added code --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r237779541 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.datasource + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} +import scala.collection.JavaConverters._ +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, but the code expects /. + writerPath = writerPath.replace("\\", "/"); + + def buildTestData(): Any = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case _: Throwable => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else { + // TO DO + } + + val directSQL = sql(s"""select * FROM carbon.`$writerPath`""".stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), directSQL) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + // TODO: Make the sparkCarbonFileFormat to work without index file + test("Read sdk writer output file without Carbondata file should fail") { + buildTestData() + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + // data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO + } + } + assert(exception.getMessage() + .contains("CarbonData file is not present in the table location")) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without index file should not fail") { + buildTestData() + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO --- End diff -- Need to handle for spark 2.3 also. Please check the other cases for the same --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r237779458 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.datasource + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} +import scala.collection.JavaConverters._ +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, but the code expects /. + writerPath = writerPath.replace("\\", "/"); + + def buildTestData(): Any = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case _: Throwable => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else { + // TO DO + } + + val directSQL = sql(s"""select * FROM carbon.`$writerPath`""".stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), directSQL) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + // TODO: Make the sparkCarbonFileFormat to work without index file + test("Read sdk writer output file without Carbondata file should fail") { + buildTestData() + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + // data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO --- End diff -- Need to handle for spark 2.3 also --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2951#discussion_r237781038 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala --- @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated.datasource + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} +import scala.collection.JavaConverters._ +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.RandomStringUtils +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.util.SparkUtil +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter + +class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, but the code expects /. + writerPath = writerPath.replace("\\", "/"); + + def buildTestData(): Any = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case _: Throwable => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else { + // TO DO + } + + val directSQL = sql(s"""select * FROM carbon.`$writerPath`""".stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), directSQL) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + // TODO: Make the sparkCarbonFileFormat to work without index file + test("Read sdk writer output file without Carbondata file should fail") { + buildTestData() + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + // data source file format + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO + } + } + assert(exception.getMessage() + .contains("CarbonData file is not present in the table location")) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without index file should not fail") { + buildTestData() + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } else{ + // TO DO + } + //org.apache.spark.SparkException: Index file not present to read the carbondata file + assert(sql("select * from sdkOutputTable").collect().length == 100) + assert(sql("select * from sdkOutputTable where name='robot0'").collect().length == 1) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + test("Read data having multi blocklet and validate min max flag") { + buildTestDataMuliBlockLet(750000, 50000) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } + checkAnswer(sql("select count(*) from sdkOutputTable"),Seq(Row(800000))) + checkAnswer(sql( + "select count(*) from sdkOutputTable where from_email='Email for testing min max for " + + "allowed chars'"), + Seq(Row(50000))) + //expected answer for min max flag. FInally there should be 2 blocklets with one blocklet + // having min max flag as false for email column and other as true + val blocklet1MinMaxFlag = Array(true, true, true, true, true, false, true, true, true) + val blocklet2MinMaxFlag = Array(true, true, true, true, true, true, true, true, true) + val expectedMinMaxFlag = Array(blocklet1MinMaxFlag, blocklet2MinMaxFlag) + validateMinMaxFlag(expectedMinMaxFlag, 2) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + clearDataMapCache + cleanTestData() + } + def buildTestDataMuliBlockLet(recordsInBlocklet1 :Int, recordsInBlocklet2 :Int): Unit ={ + FileUtils.deleteDirectory(new File(writerPath)) + val fields=new Array[Field](8) + fields(0)=new Field("myid",DataTypes.INT) + fields(1)=new Field("event_id",DataTypes.STRING) + fields(2)=new Field("eve_time",DataTypes.DATE) + fields(3)=new Field("ingestion_time",DataTypes.TIMESTAMP) + fields(4)=new Field("alldate",DataTypes.createArrayType(DataTypes.DATE)) + fields(5)=new Field("subject",DataTypes.STRING) + fields(6)=new Field("from_email",DataTypes.STRING) + fields(7)=new Field("sal",DataTypes.DOUBLE) + import scala.collection.JavaConverters._ + val emailDataBlocklet1 = "FromEmail" + val emailDataBlocklet2 = "Email for testing min max for allowed chars" + try{ + val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava + val writer = CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16) + .sortBy(Array("myid", "ingestion_time", "event_id")).withLoadOptions(options) + .withCsvInput(new Schema(fields)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val date_F=new SimpleDateFormat("yyyy-MM-dd") + for(i<- 1 to recordsInBlocklet1){ + val time=new Date(System.currentTimeMillis()) + writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0",emailDataBlocklet1,""+new Random().nextDouble())) + } + for(i<- 1 to recordsInBlocklet2){ + val time=new Date(System.currentTimeMillis()) + writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0",emailDataBlocklet2,""+new Random().nextDouble())) + } + writer.close() + } + } + + /** + * read carbon index file and validate the min max flag written in each blocklet + * + * @param expectedMinMaxFlag + * @param numBlocklets + */ + private def validateMinMaxFlag(expectedMinMaxFlag: Array[Array[Boolean]], + numBlocklets: Int): Unit = { + val carbonFiles: Array[File] = new File(writerPath).listFiles() + val carbonIndexFile = carbonFiles.filter(file => file.getName.endsWith(".carbonindex"))(0) + val converter: DataFileFooterConverter = new DataFileFooterConverter(sqlContext.sparkSession.sessionState + .newHadoopConf()) + val carbonIndexFilePath = FileFactory.getUpdatedFilePath(carbonIndexFile.getCanonicalPath) + val indexMetadata: List[DataFileFooter] = converter + .getIndexInfo(carbonIndexFilePath, null, false).asScala.toList + assert(indexMetadata.size == numBlocklets) + indexMetadata.zipWithIndex.foreach { filefooter => + val isMinMaxSet: Array[Boolean] = filefooter._1.getBlockletIndex.getMinMaxIndex.getIsMinMaxSet + assert(isMinMaxSet.sameElements(expectedMinMaxFlag(filefooter._2))) + } + } + + private def clearDataMapCache(): Unit = { + if (!sqlContext.sparkContext.version.startsWith("2.1")) { + val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size() + DataMapStoreManager.getInstance() + .clearDataMaps(AbsoluteTableIdentifier.from(writerPath)) + assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size()) + } + } + + test("Test with long string columns") { + FileUtils.deleteDirectory(new File(writerPath)) + // here we specify the long string column as varchar + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"address\":\"varchar\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"note\":\"varchar\"}\n") + .append("]") + .toString() + val builder = CarbonWriter.builder() + val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("CreateTableUsingSparkCarbonFileFormatTestCase").build() + val totalRecordsNum = 3 + for (i <- 0 until totalRecordsNum) { + // write a varchar with 75,000 length + writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000))) + } + writer.close() + + //--------------- data source external table with schema --------------------------- + sql("DROP TABLE IF EXISTS sdkOutputTable") + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) + |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """ + .stripMargin) + } else { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon + |OPTIONS("long_String_columns"="address, note") LOCATION + |'$writerPath' """.stripMargin) + } + checkAnswer(sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1))) + checkAnswer(sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"), + Seq(Row(totalRecordsNum))) + sql("DROP TABLE sdkOutputTable") + + //--------------- data source external table without schema --------------------------- + sql("DROP TABLE IF EXISTS sdkOutputTableWithoutSchema") + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH + |'$writerPath', "long_String_columns" "address, note") """.stripMargin) + } else { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS + |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin) + } + checkAnswer(sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1))) --- End diff -- check for record instead of count. handle same for other test cases as well --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/2951 @shivamasn Please add description for the PR. Also attach test report in the description. --- |
In reply to this post by qiuchenjian-2
Github user shivamasn commented on the issue:
https://github.com/apache/carbondata/pull/2951 > @shivamasn Please add description for the PR. Also attach test report in the description. Done --- |
In reply to this post by qiuchenjian-2
Github user brijoobopanna commented on the issue:
https://github.com/apache/carbondata/pull/2951 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2951 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1749/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2951 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1960/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2951 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10008/ --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/2951 @shivamasn Please add test cases for map type too --- |
Free forum by Nabble | Edit this page |