Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3812/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181588502 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala --- @@ -68,19 +78,16 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore { filter: Expression): java.util.Iterator[CarbonRow] = { require(path != null) require(projectColumns != null) - val table = getTable(path) - val rdd = new CarbonScanRDD[CarbonRow]( - spark = session, - columnProjection = new CarbonProjection(projectColumns), - filterExpression = filter, - identifier = table.getAbsoluteTableIdentifier, - serializedTableInfo = table.getTableInfo.serialize, - tableInfo = table.getTableInfo, - inputMetricsStats = new CarbonInputMetrics, - partitionNames = null, - dataTypeConverterClz = null, - readSupportClz = classOf[CarbonRowReadSupport]) - rdd.collect + scan(getTable(path), projectColumns, filter) + } + + def scan( + carbonTable: CarbonTable, + projectColumns: Array[String], --- End diff -- why not use `QueryProjection` instead of array --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181589099 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.worker; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; +import org.apache.carbondata.store.protocol.SearchRequest; +import org.apache.carbondata.store.protocol.SearchResult; +import org.apache.carbondata.store.util.GrpcSerdes; + +import com.google.protobuf.ByteString; + +/** + * Thread runnable for handling SearchRequest from master. + */ +@InterfaceAudience.Internal +class SearchRequestHandler implements Runnable { + + private static final LogService LOG = + LogServiceFactory.getLogService(SearchRequestHandler.class.getName()); + private boolean running = true; + private Queue<SearchService.SearchRequestContext> requestQueue; + + SearchRequestHandler(Queue<SearchService.SearchRequestContext> requestQueue) { + this.requestQueue = requestQueue; + } + + public void run() { + while (running) { + SearchService.SearchRequestContext requestContext = requestQueue.poll(); + if (requestContext == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.error(e); + } + } else { + try { + List<CarbonRow> rows = handleRequest(requestContext); + sendSuccessResponse(requestContext, rows); + } catch (IOException | InterruptedException e) { + LOG.error(e); + sendFailureResponse(requestContext, e); + } + } + } + } + + public void stop() { + running = false; + } + + /** + * Builds {@link QueryModel} and read data from files + */ + private List<CarbonRow> handleRequest(SearchService.SearchRequestContext requestContext) + throws IOException, InterruptedException { + SearchRequest request = requestContext.getRequest(); + TableInfo tableInfo = GrpcSerdes.deserialize(request.getTableInfo()); + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); + QueryModel queryModel = createQueryModel(table, request); + + // the request contains CarbonMultiBlockSplit and reader will read multiple blocks + // by using a thread pool + CarbonMultiBlockSplit mbSplit = getMultiBlockSplit(request); + + // If there is FGDataMap, prune the split by applying FGDataMap + queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit); --- End diff -- How did you avoid pruning fro driver side? Please make sure that it does not prune twice --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181589448 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala --- @@ -68,19 +78,16 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore { filter: Expression): java.util.Iterator[CarbonRow] = { require(path != null) require(projectColumns != null) - val table = getTable(path) - val rdd = new CarbonScanRDD[CarbonRow]( - spark = session, - columnProjection = new CarbonProjection(projectColumns), - filterExpression = filter, - identifier = table.getAbsoluteTableIdentifier, - serializedTableInfo = table.getTableInfo.serialize, - tableInfo = table.getTableInfo, - inputMetricsStats = new CarbonInputMetrics, - partitionNames = null, - dataTypeConverterClz = null, - readSupportClz = classOf[CarbonRowReadSupport]) - rdd.collect + scan(getTable(path), projectColumns, filter) + } + + def scan( + carbonTable: CarbonTable, + projectColumns: Array[String], --- End diff -- Now I removed this function and use `scan` interface only --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181612979 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala --- @@ -18,6 +18,7 @@ package org.apache.carbondata.spark.rdd import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.net.InetAddress --- End diff -- Unused import, please remove --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181613087 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -59,6 +60,8 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} +import org.apache.carbondata.store.master.Master +import org.apache.carbondata.store.worker.Worker --- End diff -- Above all newly added imports are unused, please remove --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181613222 --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java --- @@ -26,7 +26,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; --- End diff -- Please remove line 38 and 45 unused import --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181617779 --- Diff: pom.xml --- @@ -121,6 +122,8 @@ <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name> <script.exetension>.sh</script.exetension> <carbon.hive.based.metastore>false</carbon.hive.based.metastore> + <grpc.version>1.10.0</grpc.version> + <netty.version>4.0.43.Final</netty.version> --- End diff -- Better use netty version same version as spark uses. Otherwise lot of exclusions need to be done. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181620436 --- Diff: store/search/src/main/java/org/apache/carbondata/store/master/Master.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.master; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; +import org.apache.carbondata.store.protocol.EchoRequest; +import org.apache.carbondata.store.protocol.EchoResponse; +import org.apache.carbondata.store.protocol.SearchRequest; +import org.apache.carbondata.store.protocol.SearchResult; +import org.apache.carbondata.store.protocol.ShutdownRequest; +import org.apache.carbondata.store.protocol.ShutdownResponse; +import org.apache.carbondata.store.protocol.WorkerGrpc; +import org.apache.carbondata.store.util.GrpcSerdes; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; + +/** + * Master of CarbonSearch. + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to register. + * And it provides search API to fire RPC call to workers. + */ +@InterfaceAudience.Internal +public class Master { + + private static final LogService LOG = LogServiceFactory.getLogService(Master.class.getName()); + + public static final int DEFAULT_PORT = 10020; + + private Server registryServer; + + private int port; + + private Random random = new Random(); + + /** mapping of worker hostname to rpc stub */ + private Map<String, WorkerGrpc.WorkerFutureStub> workers; + + public Master() { + this(DEFAULT_PORT); + } + + public Master(int port) { + this.port = port; + this.workers = new ConcurrentHashMap<>(); + } + + /** start service and listen on port passed in constructor */ + public void startService() throws IOException { + if (registryServer == null) { + /* The port on which the registryServer should run */ + registryServer = ServerBuilder.forPort(port) + .addService(new RegistryService(this)) + .build() + .start(); + LOG.info("Master started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + LOG.info("*** shutting down gRPC Master since JVM is shutting down"); + stopService(); + LOG.info("*** Master shut down"); + } + }); + } + } + + public void stopService() { + if (registryServer != null) { + registryServer.shutdown(); + } + } + + public void stopAllWorkers() throws IOException, ExecutionException, InterruptedException { + ShutdownRequest request = ShutdownRequest.newBuilder() + .setTrigger(ShutdownRequest.Trigger.USER) + .build(); + for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : workers.entrySet()) { + ListenableFuture<ShutdownResponse> future = worker.getValue().shutdown(request); + ShutdownResponse response = future.get(); + if (response.getStatus() != ShutdownResponse.Status.SUCCESS) { + LOG.error("failed to shutdown worker: " + response.getMessage()); + throw new IOException(response.getMessage()); + } else { + workers.remove(worker.getKey()); + } + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (registryServer != null) { + registryServer.awaitTermination(); + } + } + + /** A new searcher is trying to register, add it to the map and connect to this searcher */ + void addWorker(String workerHostname, int port, int cores) + throws ExecutionException, InterruptedException { + Objects.requireNonNull(workerHostname); + + LOG.info("trying to connect to searcher " + workerHostname + ":" + port); + ManagedChannel channelToWorker = ManagedChannelBuilder.forAddress(workerHostname, port) + .usePlaintext(true) + .maxInboundMessageSize(200 * 1000 * 1000) + .build(); + WorkerGrpc.WorkerFutureStub futureStub = WorkerGrpc.newFutureStub(channelToWorker); + + // try to send a message to worker as a test + tryEcho(futureStub); + workers.put(workerHostname, futureStub); + } + + private void tryEcho(WorkerGrpc.WorkerFutureStub stub) + throws ExecutionException, InterruptedException { + EchoRequest request = EchoRequest.newBuilder().setMessage("hello").build(); + LOG.info("echo to searcher: " + request.getMessage()); + ListenableFuture<EchoResponse> response = stub.echo(request); + try { + LOG.info("echo from searcher: " + response.get().getMessage()); + } catch (InterruptedException | ExecutionException e) { + LOG.error("failed to echo: " + e.getMessage()); + throw e; + } + } + + /** + * Execute search by firing RPC call to worker, return the result rows + */ + public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter) + throws IOException, InvalidConfigurationException, ExecutionException, InterruptedException { + Objects.requireNonNull(table); + Objects.requireNonNull(columns); + + if (workers.size() == 0) { + throw new IOException("No searcher is available"); + } + + int queryId = random.nextInt(); + + // Build a SearchRequest + SearchRequest.Builder builder = SearchRequest.newBuilder() + .setQueryId(queryId) + .setTableInfo(GrpcSerdes.serialize(table.getTableInfo())); + for (String column : columns) { + builder.addProjectColumns(column); + } + if (filter != null) { + builder.setFilterExpression(GrpcSerdes.serialize(filter)); + } + + // prune data and get a mapping of worker hostname to list of blocks, + // add these blocks to the SearchRequest and fire the RPC call + Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter); + + List<ListenableFuture<SearchResult>> futures = new ArrayList<>(nodeBlockMapping.size()); + + for (Map.Entry<String, List<Distributable>> entry : nodeBlockMapping.entrySet()) { + String hostname = entry.getKey(); + List<Distributable> blocks = entry.getValue(); + CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, hostname); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + mbSplit.write(dataOutput); + builder.setSplits(ByteString.copyFrom(stream.toByteArray())); + + SearchRequest request = builder.build(); + + // do RPC to worker asynchronously and concurrently + ListenableFuture<SearchResult> future = workers.get(hostname).search(request); + futures.add(future); + } + + // get all results from RPC response and return to caller + List<CarbonRow> output = new LinkedList<>(); + for (ListenableFuture<SearchResult> future : futures) { + SearchResult result = future.get(); + if (result.getQueryId() != queryId) { + throw new IOException(String.format( + "queryId in response does not match request: %d != %d", result.getQueryId(), queryId)); + } + collectResult(result, output); + } + return output.toArray(new CarbonRow[output.size()]); + } + + /** + * Prune data by using CarbonInputFormat.getSplit + * Return a mapping of hostname to list of block + */ + private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns, + Expression filter) throws IOException, InvalidConfigurationException { + JobConf jobConf = new JobConf(new Configuration()); + Job job = new Job(jobConf); + CarbonTableInputFormat<Object> format = CarbonInputFormatUtil.createCarbonTableInputFormat( + job, table, columns, filter, null, null); + + List<InputSplit> splits = format.getSplits(job); + List<Distributable> distributables = new ArrayList<>(splits.size()); + for (InputSplit split : splits) { + distributables.add(((CarbonInputSplit)split)); + } + return CarbonLoaderUtil.nodeBlockMapping( + distributables, -1, new ArrayList<String>(workers.keySet()), + CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST); + } + + /** + * Fill result row to {@param output} + */ + private void collectResult(SearchResult result, List<CarbonRow> output) throws IOException { + for (ByteString bytes : result.getRowList()) { + CarbonRow row = GrpcSerdes.deserialize(bytes); + output.add(row); + } + } + + /** return hostname of all workers */ + public Set<String> getWorkers() { + return workers.keySet(); + } + + public static void main(String[] args) throws IOException, InterruptedException { --- End diff -- what is the use of main here? is it for testing ? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181624234 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.worker; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; +import org.apache.carbondata.store.protocol.SearchRequest; +import org.apache.carbondata.store.protocol.SearchResult; +import org.apache.carbondata.store.util.GrpcSerdes; + +import com.google.protobuf.ByteString; + +/** + * Thread runnable for handling SearchRequest from master. + */ +@InterfaceAudience.Internal +class SearchRequestHandler implements Runnable { + + private static final LogService LOG = + LogServiceFactory.getLogService(SearchRequestHandler.class.getName()); + private boolean running = true; + private Queue<SearchService.SearchRequestContext> requestQueue; + + SearchRequestHandler(Queue<SearchService.SearchRequestContext> requestQueue) { + this.requestQueue = requestQueue; + } + + public void run() { + while (running) { + SearchService.SearchRequestContext requestContext = requestQueue.poll(); + if (requestContext == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.error(e); + } + } else { + try { + List<CarbonRow> rows = handleRequest(requestContext); + sendSuccessResponse(requestContext, rows); + } catch (IOException | InterruptedException e) { + LOG.error(e); + sendFailureResponse(requestContext, e); + } + } + } + } + + public void stop() { + running = false; + } + + /** + * Builds {@link QueryModel} and read data from files + */ + private List<CarbonRow> handleRequest(SearchService.SearchRequestContext requestContext) + throws IOException, InterruptedException { + SearchRequest request = requestContext.getRequest(); + TableInfo tableInfo = GrpcSerdes.deserialize(request.getTableInfo()); + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); + QueryModel queryModel = createQueryModel(table, request); + + // the request contains CarbonMultiBlockSplit and reader will read multiple blocks + // by using a thread pool + CarbonMultiBlockSplit mbSplit = getMultiBlockSplit(request); + + // If there is FGDataMap, prune the split by applying FGDataMap + queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit); --- End diff -- I am adding an option in hadoop configuration so that Master will set it before doing getSplit --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181630182 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util.concurrent.{Executors, ExecutorService} + +import org.apache.spark.sql.{CarbonSession, SparkSession} + +import org.apache.carbondata.examples.util.ExampleUtils + +/** + * An example that demonstrate how to run queries in search mode, + * and compare the performance between search mode and SparkSQL + */ +// scalastyle:off +object SearchModeExample { + + def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("CarbonSessionExample") --- End diff -- Please change the app name --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5037/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181632817 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala --- @@ -18,6 +18,7 @@ package org.apache.carbondata.spark.rdd import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.net.InetAddress --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181632873 --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java --- @@ -26,7 +26,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181632923 --- Diff: store/search/src/main/java/org/apache/carbondata/store/master/Master.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.master; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; +import org.apache.carbondata.store.protocol.EchoRequest; +import org.apache.carbondata.store.protocol.EchoResponse; +import org.apache.carbondata.store.protocol.SearchRequest; +import org.apache.carbondata.store.protocol.SearchResult; +import org.apache.carbondata.store.protocol.ShutdownRequest; +import org.apache.carbondata.store.protocol.ShutdownResponse; +import org.apache.carbondata.store.protocol.WorkerGrpc; +import org.apache.carbondata.store.util.GrpcSerdes; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; + +/** + * Master of CarbonSearch. + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to register. + * And it provides search API to fire RPC call to workers. + */ +@InterfaceAudience.Internal +public class Master { + + private static final LogService LOG = LogServiceFactory.getLogService(Master.class.getName()); + + public static final int DEFAULT_PORT = 10020; + + private Server registryServer; + + private int port; + + private Random random = new Random(); + + /** mapping of worker hostname to rpc stub */ + private Map<String, WorkerGrpc.WorkerFutureStub> workers; + + public Master() { + this(DEFAULT_PORT); + } + + public Master(int port) { + this.port = port; + this.workers = new ConcurrentHashMap<>(); + } + + /** start service and listen on port passed in constructor */ + public void startService() throws IOException { + if (registryServer == null) { + /* The port on which the registryServer should run */ + registryServer = ServerBuilder.forPort(port) + .addService(new RegistryService(this)) + .build() + .start(); + LOG.info("Master started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + LOG.info("*** shutting down gRPC Master since JVM is shutting down"); + stopService(); + LOG.info("*** Master shut down"); + } + }); + } + } + + public void stopService() { + if (registryServer != null) { + registryServer.shutdown(); + } + } + + public void stopAllWorkers() throws IOException, ExecutionException, InterruptedException { + ShutdownRequest request = ShutdownRequest.newBuilder() + .setTrigger(ShutdownRequest.Trigger.USER) + .build(); + for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : workers.entrySet()) { + ListenableFuture<ShutdownResponse> future = worker.getValue().shutdown(request); + ShutdownResponse response = future.get(); + if (response.getStatus() != ShutdownResponse.Status.SUCCESS) { + LOG.error("failed to shutdown worker: " + response.getMessage()); + throw new IOException(response.getMessage()); + } else { + workers.remove(worker.getKey()); + } + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (registryServer != null) { + registryServer.awaitTermination(); + } + } + + /** A new searcher is trying to register, add it to the map and connect to this searcher */ + void addWorker(String workerHostname, int port, int cores) + throws ExecutionException, InterruptedException { + Objects.requireNonNull(workerHostname); + + LOG.info("trying to connect to searcher " + workerHostname + ":" + port); + ManagedChannel channelToWorker = ManagedChannelBuilder.forAddress(workerHostname, port) + .usePlaintext(true) + .maxInboundMessageSize(200 * 1000 * 1000) + .build(); + WorkerGrpc.WorkerFutureStub futureStub = WorkerGrpc.newFutureStub(channelToWorker); + + // try to send a message to worker as a test + tryEcho(futureStub); + workers.put(workerHostname, futureStub); + } + + private void tryEcho(WorkerGrpc.WorkerFutureStub stub) + throws ExecutionException, InterruptedException { + EchoRequest request = EchoRequest.newBuilder().setMessage("hello").build(); + LOG.info("echo to searcher: " + request.getMessage()); + ListenableFuture<EchoResponse> response = stub.echo(request); + try { + LOG.info("echo from searcher: " + response.get().getMessage()); + } catch (InterruptedException | ExecutionException e) { + LOG.error("failed to echo: " + e.getMessage()); + throw e; + } + } + + /** + * Execute search by firing RPC call to worker, return the result rows + */ + public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter) + throws IOException, InvalidConfigurationException, ExecutionException, InterruptedException { + Objects.requireNonNull(table); + Objects.requireNonNull(columns); + + if (workers.size() == 0) { + throw new IOException("No searcher is available"); + } + + int queryId = random.nextInt(); + + // Build a SearchRequest + SearchRequest.Builder builder = SearchRequest.newBuilder() + .setQueryId(queryId) + .setTableInfo(GrpcSerdes.serialize(table.getTableInfo())); + for (String column : columns) { + builder.addProjectColumns(column); + } + if (filter != null) { + builder.setFilterExpression(GrpcSerdes.serialize(filter)); + } + + // prune data and get a mapping of worker hostname to list of blocks, + // add these blocks to the SearchRequest and fire the RPC call + Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter); + + List<ListenableFuture<SearchResult>> futures = new ArrayList<>(nodeBlockMapping.size()); + + for (Map.Entry<String, List<Distributable>> entry : nodeBlockMapping.entrySet()) { + String hostname = entry.getKey(); + List<Distributable> blocks = entry.getValue(); + CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, hostname); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + mbSplit.write(dataOutput); + builder.setSplits(ByteString.copyFrom(stream.toByteArray())); + + SearchRequest request = builder.build(); + + // do RPC to worker asynchronously and concurrently + ListenableFuture<SearchResult> future = workers.get(hostname).search(request); + futures.add(future); + } + + // get all results from RPC response and return to caller + List<CarbonRow> output = new LinkedList<>(); + for (ListenableFuture<SearchResult> future : futures) { + SearchResult result = future.get(); + if (result.getQueryId() != queryId) { + throw new IOException(String.format( + "queryId in response does not match request: %d != %d", result.getQueryId(), queryId)); + } + collectResult(result, output); + } + return output.toArray(new CarbonRow[output.size()]); + } + + /** + * Prune data by using CarbonInputFormat.getSplit + * Return a mapping of hostname to list of block + */ + private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns, + Expression filter) throws IOException, InvalidConfigurationException { + JobConf jobConf = new JobConf(new Configuration()); + Job job = new Job(jobConf); + CarbonTableInputFormat<Object> format = CarbonInputFormatUtil.createCarbonTableInputFormat( + job, table, columns, filter, null, null); + + List<InputSplit> splits = format.getSplits(job); + List<Distributable> distributables = new ArrayList<>(splits.size()); + for (InputSplit split : splits) { + distributables.add(((CarbonInputSplit)split)); + } + return CarbonLoaderUtil.nodeBlockMapping( + distributables, -1, new ArrayList<String>(workers.keySet()), + CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST); + } + + /** + * Fill result row to {@param output} + */ + private void collectResult(SearchResult result, List<CarbonRow> output) throws IOException { + for (ByteString bytes : result.getRowList()) { + CarbonRow row = GrpcSerdes.deserialize(bytes); + output.add(row); + } + } + + /** return hostname of all workers */ + public Set<String> getWorkers() { + return workers.keySet(); + } + + public static void main(String[] args) throws IOException, InterruptedException { --- End diff -- yes --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181633019 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.util.concurrent.{Executors, ExecutorService} + +import org.apache.spark.sql.{CarbonSession, SparkSession} + +import org.apache.carbondata.examples.util.ExampleUtils + +/** + * An example that demonstrate how to run queries in search mode, + * and compare the performance between search mode and SparkSQL + */ +// scalastyle:off +object SearchModeExample { + + def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("CarbonSessionExample") --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181633683 --- Diff: pom.xml --- @@ -121,6 +122,8 @@ <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name> <script.exetension>.sh</script.exetension> <carbon.hive.based.metastore>false</carbon.hive.based.metastore> + <grpc.version>1.10.0</grpc.version> + <netty.version>4.0.43.Final</netty.version> --- End diff -- yes. I checked spark2.2, they are using netty 4.0.43. see https://github.com/apache/spark/blob/branch-2.2/pom.xml#L556 because carbon-store-search module does not depend on spark, so I need to explicitly set it here --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5038/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3821/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5042/ --- |
Free forum by Nabble | Edit this page |