[GitHub] carbondata pull request #2148: [CARBONDATA-2323][WIP] Distributed search mod...

classic Classic list List threaded Threaded
140 messages Options
1234567
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3812/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181588502
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---
    @@ -68,19 +78,16 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
           filter: Expression): java.util.Iterator[CarbonRow] = {
         require(path != null)
         require(projectColumns != null)
    -    val table = getTable(path)
    -    val rdd = new CarbonScanRDD[CarbonRow](
    -      spark = session,
    -      columnProjection = new CarbonProjection(projectColumns),
    -      filterExpression = filter,
    -      identifier = table.getAbsoluteTableIdentifier,
    -      serializedTableInfo = table.getTableInfo.serialize,
    -      tableInfo = table.getTableInfo,
    -      inputMetricsStats = new CarbonInputMetrics,
    -      partitionNames = null,
    -      dataTypeConverterClz = null,
    -      readSupportClz = classOf[CarbonRowReadSupport])
    -    rdd.collect
    +    scan(getTable(path), projectColumns, filter)
    +  }
    +
    +  def scan(
    +      carbonTable: CarbonTable,
    +      projectColumns: Array[String],
    --- End diff --
   
    why not use `QueryProjection` instead of array


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181589099
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.protobuf.ByteString;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +class SearchRequestHandler implements Runnable {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +  private boolean running = true;
    +  private Queue<SearchService.SearchRequestContext> requestQueue;
    +
    +  SearchRequestHandler(Queue<SearchService.SearchRequestContext> requestQueue) {
    +    this.requestQueue = requestQueue;
    +  }
    +
    +  public void run() {
    +    while (running) {
    +      SearchService.SearchRequestContext requestContext = requestQueue.poll();
    +      if (requestContext == null) {
    +        try {
    +          Thread.sleep(10);
    +        } catch (InterruptedException e) {
    +          LOG.error(e);
    +        }
    +      } else {
    +        try {
    +          List<CarbonRow> rows = handleRequest(requestContext);
    +          sendSuccessResponse(requestContext, rows);
    +        } catch (IOException | InterruptedException e) {
    +          LOG.error(e);
    +          sendFailureResponse(requestContext, e);
    +        }
    +      }
    +    }
    +  }
    +
    +  public void stop() {
    +    running = false;
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchService.SearchRequestContext requestContext)
    +      throws IOException, InterruptedException {
    +    SearchRequest request = requestContext.getRequest();
    +    TableInfo tableInfo = GrpcSerdes.deserialize(request.getTableInfo());
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +
    +    // the request contains CarbonMultiBlockSplit and reader will read multiple blocks
    +    // by using a thread pool
    +    CarbonMultiBlockSplit mbSplit = getMultiBlockSplit(request);
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    --- End diff --
   
    How did you avoid pruning fro driver side? Please make sure that it does not prune twice


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181589448
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---
    @@ -68,19 +78,16 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
           filter: Expression): java.util.Iterator[CarbonRow] = {
         require(path != null)
         require(projectColumns != null)
    -    val table = getTable(path)
    -    val rdd = new CarbonScanRDD[CarbonRow](
    -      spark = session,
    -      columnProjection = new CarbonProjection(projectColumns),
    -      filterExpression = filter,
    -      identifier = table.getAbsoluteTableIdentifier,
    -      serializedTableInfo = table.getTableInfo.serialize,
    -      tableInfo = table.getTableInfo,
    -      inputMetricsStats = new CarbonInputMetrics,
    -      partitionNames = null,
    -      dataTypeConverterClz = null,
    -      readSupportClz = classOf[CarbonRowReadSupport])
    -    rdd.collect
    +    scan(getTable(path), projectColumns, filter)
    +  }
    +
    +  def scan(
    +      carbonTable: CarbonTable,
    +      projectColumns: Array[String],
    --- End diff --
   
    Now I removed this function and use `scan` interface only


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181612979
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.carbondata.spark.rdd
     
     import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +import java.net.InetAddress
    --- End diff --
   
    Unused import, please remove


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181613087
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -59,6 +60,8 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
     import org.apache.carbondata.processing.util.CarbonLoaderUtil
     import org.apache.carbondata.spark.InitInputMetrics
     import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
    +import org.apache.carbondata.store.master.Master
    +import org.apache.carbondata.store.worker.Worker
    --- End diff --
   
    Above all newly added imports are unused, please remove


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181613222
 
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---
    @@ -26,7 +26,6 @@
     import org.apache.carbondata.common.logging.LogService;
     import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.cache.dictionary.Dictionary;
    -import org.apache.carbondata.core.constants.CarbonCommonConstants;
    --- End diff --
   
    Please remove line 38 and 45 unused import


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181617779
 
    --- Diff: pom.xml ---
    @@ -121,6 +122,8 @@
         <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
         <script.exetension>.sh</script.exetension>
         <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
    +    <grpc.version>1.10.0</grpc.version>
    +    <netty.version>4.0.43.Final</netty.version>
    --- End diff --
   
    Better use netty version same version as spark uses. Otherwise lot of exclusions need to be done.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181620436
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/master/Master.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.master;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.block.Distributable;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.exception.InvalidConfigurationException;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +import org.apache.carbondata.store.protocol.EchoRequest;
    +import org.apache.carbondata.store.protocol.EchoResponse;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.protocol.ShutdownRequest;
    +import org.apache.carbondata.store.protocol.ShutdownResponse;
    +import org.apache.carbondata.store.protocol.WorkerGrpc;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.protobuf.ByteString;
    +import io.grpc.ManagedChannel;
    +import io.grpc.ManagedChannelBuilder;
    +import io.grpc.Server;
    +import io.grpc.ServerBuilder;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +/**
    + * Master of CarbonSearch.
    + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to register.
    + * And it provides search API to fire RPC call to workers.
    + */
    +@InterfaceAudience.Internal
    +public class Master {
    +
    +  private static final LogService LOG = LogServiceFactory.getLogService(Master.class.getName());
    +
    +  public static final int DEFAULT_PORT = 10020;
    +
    +  private Server registryServer;
    +
    +  private int port;
    +
    +  private Random random = new Random();
    +
    +  /** mapping of worker hostname to rpc stub */
    +  private Map<String, WorkerGrpc.WorkerFutureStub> workers;
    +
    +  public Master() {
    +    this(DEFAULT_PORT);
    +  }
    +
    +  public Master(int port) {
    +    this.port = port;
    +    this.workers = new ConcurrentHashMap<>();
    +  }
    +
    +  /** start service and listen on port passed in constructor */
    +  public void startService() throws IOException {
    +    if (registryServer == null) {
    +      /* The port on which the registryServer should run */
    +      registryServer = ServerBuilder.forPort(port)
    +          .addService(new RegistryService(this))
    +          .build()
    +          .start();
    +      LOG.info("Master started, listening on " + port);
    +      Runtime.getRuntime().addShutdownHook(new Thread() {
    +        @Override public void run() {
    +          // Use stderr here since the logger may have been reset by its JVM shutdown hook.
    +          LOG.info("*** shutting down gRPC Master since JVM is shutting down");
    +          stopService();
    +          LOG.info("*** Master shut down");
    +        }
    +      });
    +    }
    +  }
    +
    +  public void stopService() {
    +    if (registryServer != null) {
    +      registryServer.shutdown();
    +    }
    +  }
    +
    +  public void stopAllWorkers() throws IOException, ExecutionException, InterruptedException {
    +    ShutdownRequest request = ShutdownRequest.newBuilder()
    +        .setTrigger(ShutdownRequest.Trigger.USER)
    +        .build();
    +    for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : workers.entrySet()) {
    +      ListenableFuture<ShutdownResponse> future = worker.getValue().shutdown(request);
    +      ShutdownResponse response = future.get();
    +      if (response.getStatus() != ShutdownResponse.Status.SUCCESS) {
    +        LOG.error("failed to shutdown worker: " + response.getMessage());
    +        throw new IOException(response.getMessage());
    +      } else {
    +        workers.remove(worker.getKey());
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Await termination on the main thread since the grpc library uses daemon threads.
    +   */
    +  private void blockUntilShutdown() throws InterruptedException {
    +    if (registryServer != null) {
    +      registryServer.awaitTermination();
    +    }
    +  }
    +
    +  /** A new searcher is trying to register, add it to the map and connect to this searcher */
    +  void addWorker(String workerHostname, int port, int cores)
    +      throws ExecutionException, InterruptedException {
    +    Objects.requireNonNull(workerHostname);
    +
    +    LOG.info("trying to connect to searcher " + workerHostname + ":" + port);
    +    ManagedChannel channelToWorker = ManagedChannelBuilder.forAddress(workerHostname, port)
    +        .usePlaintext(true)
    +        .maxInboundMessageSize(200 * 1000 * 1000)
    +        .build();
    +    WorkerGrpc.WorkerFutureStub futureStub = WorkerGrpc.newFutureStub(channelToWorker);
    +
    +    // try to send a message to worker as a test
    +    tryEcho(futureStub);
    +    workers.put(workerHostname, futureStub);
    +  }
    +
    +  private void tryEcho(WorkerGrpc.WorkerFutureStub stub)
    +      throws ExecutionException, InterruptedException {
    +    EchoRequest request = EchoRequest.newBuilder().setMessage("hello").build();
    +    LOG.info("echo to searcher: " + request.getMessage());
    +    ListenableFuture<EchoResponse> response = stub.echo(request);
    +    try {
    +      LOG.info("echo from searcher: " + response.get().getMessage());
    +    } catch (InterruptedException | ExecutionException e) {
    +      LOG.error("failed to echo: " + e.getMessage());
    +      throw e;
    +    }
    +  }
    +
    +  /**
    +   * Execute search by firing RPC call to worker, return the result rows
    +   */
    +  public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter)
    +      throws IOException, InvalidConfigurationException, ExecutionException, InterruptedException {
    +    Objects.requireNonNull(table);
    +    Objects.requireNonNull(columns);
    +
    +    if (workers.size() == 0) {
    +      throw new IOException("No searcher is available");
    +    }
    +
    +    int queryId = random.nextInt();
    +
    +    // Build a SearchRequest
    +    SearchRequest.Builder builder = SearchRequest.newBuilder()
    +        .setQueryId(queryId)
    +        .setTableInfo(GrpcSerdes.serialize(table.getTableInfo()));
    +    for (String column : columns) {
    +      builder.addProjectColumns(column);
    +    }
    +    if (filter != null) {
    +      builder.setFilterExpression(GrpcSerdes.serialize(filter));
    +    }
    +
    +    // prune data and get a mapping of worker hostname to list of blocks,
    +    // add these blocks to the SearchRequest and fire the RPC call
    +    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter);
    +
    +    List<ListenableFuture<SearchResult>> futures = new ArrayList<>(nodeBlockMapping.size());
    +
    +    for (Map.Entry<String, List<Distributable>> entry : nodeBlockMapping.entrySet()) {
    +      String hostname = entry.getKey();
    +      List<Distributable> blocks = entry.getValue();
    +      CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, hostname);
    +      ByteArrayOutputStream stream = new ByteArrayOutputStream();
    +      DataOutput dataOutput = new DataOutputStream(stream);
    +      mbSplit.write(dataOutput);
    +      builder.setSplits(ByteString.copyFrom(stream.toByteArray()));
    +
    +      SearchRequest request = builder.build();
    +
    +      // do RPC to worker asynchronously and concurrently
    +      ListenableFuture<SearchResult> future = workers.get(hostname).search(request);
    +      futures.add(future);
    +    }
    +
    +    // get all results from RPC response and return to caller
    +    List<CarbonRow> output = new LinkedList<>();
    +    for (ListenableFuture<SearchResult> future : futures) {
    +      SearchResult result = future.get();
    +      if (result.getQueryId() != queryId) {
    +        throw new IOException(String.format(
    +            "queryId in response does not match request: %d != %d", result.getQueryId(), queryId));
    +      }
    +      collectResult(result, output);
    +    }
    +    return output.toArray(new CarbonRow[output.size()]);
    +  }
    +
    +  /**
    +   * Prune data by using CarbonInputFormat.getSplit
    +   * Return a mapping of hostname to list of block
    +   */
    +  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns,
    +      Expression filter) throws IOException, InvalidConfigurationException {
    +    JobConf jobConf = new JobConf(new Configuration());
    +    Job job = new Job(jobConf);
    +    CarbonTableInputFormat<Object> format = CarbonInputFormatUtil.createCarbonTableInputFormat(
    +        job, table, columns, filter, null, null);
    +
    +    List<InputSplit> splits = format.getSplits(job);
    +    List<Distributable> distributables = new ArrayList<>(splits.size());
    +    for (InputSplit split : splits) {
    +      distributables.add(((CarbonInputSplit)split));
    +    }
    +    return CarbonLoaderUtil.nodeBlockMapping(
    +        distributables, -1, new ArrayList<String>(workers.keySet()),
    +        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
    +  }
    +
    +  /**
    +   * Fill result row to {@param output}
    +   */
    +  private void collectResult(SearchResult result,  List<CarbonRow> output) throws IOException {
    +    for (ByteString bytes : result.getRowList()) {
    +      CarbonRow row = GrpcSerdes.deserialize(bytes);
    +      output.add(row);
    +    }
    +  }
    +
    +  /** return hostname of all workers */
    +  public Set<String> getWorkers() {
    +    return workers.keySet();
    +  }
    +
    +  public static void main(String[] args) throws IOException, InterruptedException {
    --- End diff --
   
    what is the use of main here? is it for testing ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181624234
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.protobuf.ByteString;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +class SearchRequestHandler implements Runnable {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +  private boolean running = true;
    +  private Queue<SearchService.SearchRequestContext> requestQueue;
    +
    +  SearchRequestHandler(Queue<SearchService.SearchRequestContext> requestQueue) {
    +    this.requestQueue = requestQueue;
    +  }
    +
    +  public void run() {
    +    while (running) {
    +      SearchService.SearchRequestContext requestContext = requestQueue.poll();
    +      if (requestContext == null) {
    +        try {
    +          Thread.sleep(10);
    +        } catch (InterruptedException e) {
    +          LOG.error(e);
    +        }
    +      } else {
    +        try {
    +          List<CarbonRow> rows = handleRequest(requestContext);
    +          sendSuccessResponse(requestContext, rows);
    +        } catch (IOException | InterruptedException e) {
    +          LOG.error(e);
    +          sendFailureResponse(requestContext, e);
    +        }
    +      }
    +    }
    +  }
    +
    +  public void stop() {
    +    running = false;
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchService.SearchRequestContext requestContext)
    +      throws IOException, InterruptedException {
    +    SearchRequest request = requestContext.getRequest();
    +    TableInfo tableInfo = GrpcSerdes.deserialize(request.getTableInfo());
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +
    +    // the request contains CarbonMultiBlockSplit and reader will read multiple blocks
    +    // by using a thread pool
    +    CarbonMultiBlockSplit mbSplit = getMultiBlockSplit(request);
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    --- End diff --
   
    I am adding an option in hadoop configuration so that Master will set it before doing getSplit


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181630182
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +import java.util.concurrent.{Executors, ExecutorService}
    +
    +import org.apache.spark.sql.{CarbonSession, SparkSession}
    +
    +import org.apache.carbondata.examples.util.ExampleUtils
    +
    +/**
    + * An example that demonstrate how to run queries in search mode,
    + * and compare the performance between search mode and SparkSQL
    + */
    +// scalastyle:off
    +object SearchModeExample {
    +
    +  def main(args: Array[String]) {
    +    val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
    --- End diff --
   
    Please change the app name


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5037/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632817
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.carbondata.spark.rdd
     
     import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +import java.net.InetAddress
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632873
 
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---
    @@ -26,7 +26,6 @@
     import org.apache.carbondata.common.logging.LogService;
     import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.cache.dictionary.Dictionary;
    -import org.apache.carbondata.core.constants.CarbonCommonConstants;
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632923
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/master/Master.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.master;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.block.Distributable;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.exception.InvalidConfigurationException;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +import org.apache.carbondata.store.protocol.EchoRequest;
    +import org.apache.carbondata.store.protocol.EchoResponse;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.protocol.ShutdownRequest;
    +import org.apache.carbondata.store.protocol.ShutdownResponse;
    +import org.apache.carbondata.store.protocol.WorkerGrpc;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.protobuf.ByteString;
    +import io.grpc.ManagedChannel;
    +import io.grpc.ManagedChannelBuilder;
    +import io.grpc.Server;
    +import io.grpc.ServerBuilder;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +/**
    + * Master of CarbonSearch.
    + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to register.
    + * And it provides search API to fire RPC call to workers.
    + */
    +@InterfaceAudience.Internal
    +public class Master {
    +
    +  private static final LogService LOG = LogServiceFactory.getLogService(Master.class.getName());
    +
    +  public static final int DEFAULT_PORT = 10020;
    +
    +  private Server registryServer;
    +
    +  private int port;
    +
    +  private Random random = new Random();
    +
    +  /** mapping of worker hostname to rpc stub */
    +  private Map<String, WorkerGrpc.WorkerFutureStub> workers;
    +
    +  public Master() {
    +    this(DEFAULT_PORT);
    +  }
    +
    +  public Master(int port) {
    +    this.port = port;
    +    this.workers = new ConcurrentHashMap<>();
    +  }
    +
    +  /** start service and listen on port passed in constructor */
    +  public void startService() throws IOException {
    +    if (registryServer == null) {
    +      /* The port on which the registryServer should run */
    +      registryServer = ServerBuilder.forPort(port)
    +          .addService(new RegistryService(this))
    +          .build()
    +          .start();
    +      LOG.info("Master started, listening on " + port);
    +      Runtime.getRuntime().addShutdownHook(new Thread() {
    +        @Override public void run() {
    +          // Use stderr here since the logger may have been reset by its JVM shutdown hook.
    +          LOG.info("*** shutting down gRPC Master since JVM is shutting down");
    +          stopService();
    +          LOG.info("*** Master shut down");
    +        }
    +      });
    +    }
    +  }
    +
    +  public void stopService() {
    +    if (registryServer != null) {
    +      registryServer.shutdown();
    +    }
    +  }
    +
    +  public void stopAllWorkers() throws IOException, ExecutionException, InterruptedException {
    +    ShutdownRequest request = ShutdownRequest.newBuilder()
    +        .setTrigger(ShutdownRequest.Trigger.USER)
    +        .build();
    +    for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : workers.entrySet()) {
    +      ListenableFuture<ShutdownResponse> future = worker.getValue().shutdown(request);
    +      ShutdownResponse response = future.get();
    +      if (response.getStatus() != ShutdownResponse.Status.SUCCESS) {
    +        LOG.error("failed to shutdown worker: " + response.getMessage());
    +        throw new IOException(response.getMessage());
    +      } else {
    +        workers.remove(worker.getKey());
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Await termination on the main thread since the grpc library uses daemon threads.
    +   */
    +  private void blockUntilShutdown() throws InterruptedException {
    +    if (registryServer != null) {
    +      registryServer.awaitTermination();
    +    }
    +  }
    +
    +  /** A new searcher is trying to register, add it to the map and connect to this searcher */
    +  void addWorker(String workerHostname, int port, int cores)
    +      throws ExecutionException, InterruptedException {
    +    Objects.requireNonNull(workerHostname);
    +
    +    LOG.info("trying to connect to searcher " + workerHostname + ":" + port);
    +    ManagedChannel channelToWorker = ManagedChannelBuilder.forAddress(workerHostname, port)
    +        .usePlaintext(true)
    +        .maxInboundMessageSize(200 * 1000 * 1000)
    +        .build();
    +    WorkerGrpc.WorkerFutureStub futureStub = WorkerGrpc.newFutureStub(channelToWorker);
    +
    +    // try to send a message to worker as a test
    +    tryEcho(futureStub);
    +    workers.put(workerHostname, futureStub);
    +  }
    +
    +  private void tryEcho(WorkerGrpc.WorkerFutureStub stub)
    +      throws ExecutionException, InterruptedException {
    +    EchoRequest request = EchoRequest.newBuilder().setMessage("hello").build();
    +    LOG.info("echo to searcher: " + request.getMessage());
    +    ListenableFuture<EchoResponse> response = stub.echo(request);
    +    try {
    +      LOG.info("echo from searcher: " + response.get().getMessage());
    +    } catch (InterruptedException | ExecutionException e) {
    +      LOG.error("failed to echo: " + e.getMessage());
    +      throw e;
    +    }
    +  }
    +
    +  /**
    +   * Execute search by firing RPC call to worker, return the result rows
    +   */
    +  public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter)
    +      throws IOException, InvalidConfigurationException, ExecutionException, InterruptedException {
    +    Objects.requireNonNull(table);
    +    Objects.requireNonNull(columns);
    +
    +    if (workers.size() == 0) {
    +      throw new IOException("No searcher is available");
    +    }
    +
    +    int queryId = random.nextInt();
    +
    +    // Build a SearchRequest
    +    SearchRequest.Builder builder = SearchRequest.newBuilder()
    +        .setQueryId(queryId)
    +        .setTableInfo(GrpcSerdes.serialize(table.getTableInfo()));
    +    for (String column : columns) {
    +      builder.addProjectColumns(column);
    +    }
    +    if (filter != null) {
    +      builder.setFilterExpression(GrpcSerdes.serialize(filter));
    +    }
    +
    +    // prune data and get a mapping of worker hostname to list of blocks,
    +    // add these blocks to the SearchRequest and fire the RPC call
    +    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter);
    +
    +    List<ListenableFuture<SearchResult>> futures = new ArrayList<>(nodeBlockMapping.size());
    +
    +    for (Map.Entry<String, List<Distributable>> entry : nodeBlockMapping.entrySet()) {
    +      String hostname = entry.getKey();
    +      List<Distributable> blocks = entry.getValue();
    +      CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, hostname);
    +      ByteArrayOutputStream stream = new ByteArrayOutputStream();
    +      DataOutput dataOutput = new DataOutputStream(stream);
    +      mbSplit.write(dataOutput);
    +      builder.setSplits(ByteString.copyFrom(stream.toByteArray()));
    +
    +      SearchRequest request = builder.build();
    +
    +      // do RPC to worker asynchronously and concurrently
    +      ListenableFuture<SearchResult> future = workers.get(hostname).search(request);
    +      futures.add(future);
    +    }
    +
    +    // get all results from RPC response and return to caller
    +    List<CarbonRow> output = new LinkedList<>();
    +    for (ListenableFuture<SearchResult> future : futures) {
    +      SearchResult result = future.get();
    +      if (result.getQueryId() != queryId) {
    +        throw new IOException(String.format(
    +            "queryId in response does not match request: %d != %d", result.getQueryId(), queryId));
    +      }
    +      collectResult(result, output);
    +    }
    +    return output.toArray(new CarbonRow[output.size()]);
    +  }
    +
    +  /**
    +   * Prune data by using CarbonInputFormat.getSplit
    +   * Return a mapping of hostname to list of block
    +   */
    +  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns,
    +      Expression filter) throws IOException, InvalidConfigurationException {
    +    JobConf jobConf = new JobConf(new Configuration());
    +    Job job = new Job(jobConf);
    +    CarbonTableInputFormat<Object> format = CarbonInputFormatUtil.createCarbonTableInputFormat(
    +        job, table, columns, filter, null, null);
    +
    +    List<InputSplit> splits = format.getSplits(job);
    +    List<Distributable> distributables = new ArrayList<>(splits.size());
    +    for (InputSplit split : splits) {
    +      distributables.add(((CarbonInputSplit)split));
    +    }
    +    return CarbonLoaderUtil.nodeBlockMapping(
    +        distributables, -1, new ArrayList<String>(workers.keySet()),
    +        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
    +  }
    +
    +  /**
    +   * Fill result row to {@param output}
    +   */
    +  private void collectResult(SearchResult result,  List<CarbonRow> output) throws IOException {
    +    for (ByteString bytes : result.getRowList()) {
    +      CarbonRow row = GrpcSerdes.deserialize(bytes);
    +      output.add(row);
    +    }
    +  }
    +
    +  /** return hostname of all workers */
    +  public Set<String> getWorkers() {
    +    return workers.keySet();
    +  }
    +
    +  public static void main(String[] args) throws IOException, InterruptedException {
    --- End diff --
   
    yes


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181633019
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +import java.util.concurrent.{Executors, ExecutorService}
    +
    +import org.apache.spark.sql.{CarbonSession, SparkSession}
    +
    +import org.apache.carbondata.examples.util.ExampleUtils
    +
    +/**
    + * An example that demonstrate how to run queries in search mode,
    + * and compare the performance between search mode and SparkSQL
    + */
    +// scalastyle:off
    +object SearchModeExample {
    +
    +  def main(args: Array[String]) {
    +    val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181633683
 
    --- Diff: pom.xml ---
    @@ -121,6 +122,8 @@
         <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
         <script.exetension>.sh</script.exetension>
         <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
    +    <grpc.version>1.10.0</grpc.version>
    +    <netty.version>4.0.43.Final</netty.version>
    --- End diff --
   
    yes. I checked spark2.2, they are using netty 4.0.43. see https://github.com/apache/spark/blob/branch-2.2/pom.xml#L556
   
    because carbon-store-search module does not depend on spark, so I need to explicitly set it here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5038/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3821/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5042/



---
1234567