Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162090652 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.SparkSession +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * 3. With the indexing feature of carbondata, the data read from object based storage is minimum, + * thus providing both high performance analytic and low cost storage + * + * @param args require three parameters "Access-key" "Secret-key" + * "s3 bucket path" "spark-master" "s3-endpoint" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 4 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path> <spark-master> <s3-endpoint>") --- End diff -- Modify to `<table-path-on-S3>` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162090911 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.SparkSession +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * 3. With the indexing feature of carbondata, the data read from object based storage is minimum, + * thus providing both high performance analytic and low cost storage + * + * @param args require three parameters "Access-key" "Secret-key" + * "s3 bucket path" "spark-master" "s3-endpoint" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 4 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path> <spark-master> <s3-endpoint>") --- End diff -- move <spark-master> as the last parameter, it should be optional, default value is 'local' --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1805 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1679/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1805 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2912/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1805 please rebase and drop the first two commits which are merged to carbonstore branch already --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1805 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2961/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1805 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1703/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1805 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2937/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
Github user jatin9896 commented on the issue:
https://github.com/apache/carbondata/pull/1805 retest this please --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162304024 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala --- @@ -41,102 +41,95 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, identifier: AbsoluteTableIdentifier, prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { - var storeLocation: String = null - val carbonLoadModel = alterPartitionModel.carbonLoadModel - val segmentId = alterPartitionModel.segmentId - val oldPartitionIds = alterPartitionModel.oldPartitionIds - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val databaseName = carbonTable.getDatabaseName - val factTableName = carbonTable.getTableName - val partitionInfo = carbonTable.getPartitionInfo(factTableName) + var storeLocation: String = null --- End diff -- Can you put these codestyle modification into a separate PR? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162304653 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + --- End diff -- please do one more select * query --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162304779 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { --- End diff -- why need to compare with ".com"? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162304832 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { + args(3) + } + else { --- End diff -- move one line up --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162304995 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { + args(3) + } + else { + "" + } + } + + def getSparkMaster(args: Array[String]): String = { + if (args.length >= 4) { + if (args.length == 5) { --- End diff -- this logic can be optimized, you can make one level of `if... else if` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162305106 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { + args(3) + } + else { + "" + } + } + + def getSparkMaster(args: Array[String]): String = { + if (args.length >= 4) { + if (args.length == 5) { + args(4) + } + else if (args(3).contains("spark:") || args(3).contains("mesos:")) { --- End diff -- `else if` should be same line as `}` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162305178 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { + args(3) + } + else { + "" + } + } + + def getSparkMaster(args: Array[String]): String = { + if (args.length >= 4) { + if (args.length == 5) { + args(4) + } + else if (args(3).contains("spark:") || args(3).contains("mesos:")) { + args(3) + } + else { + "local" + } + } + else { --- End diff -- `else if` should be same line as `}` --- |
In reply to this post by qiuchenjian-2
Github user jatin9896 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162305664 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { --- End diff -- it might happen that at args(4) be spark master. --- |
In reply to this post by qiuchenjian-2
Github user jatin9896 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162306737 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { + args(3) + } + else { + "" + } + } + + def getSparkMaster(args: Array[String]): String = { + if (args.length >= 4) { + if (args.length == 5) { --- End diff -- sure --- |
In reply to this post by qiuchenjian-2
Github user jatin9896 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1805#discussion_r162311496 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) { + args(3) + } + else { + "" + } + } + + def getSparkMaster(args: Array[String]): String = { + if (args.length >= 4) { + if (args.length == 5) { --- End diff -- done --- |
Free forum by Nabble | Edit this page |