[GitHub] carbondata pull request #2148: [CARBONDATA-2323][WIP] Distributed search mod...

classic Classic list List threaded Threaded
140 messages Options
1 ... 34567
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    @ravipesala No, it is not possible in this version now. In next version, we need to change it.
    In spark 2.1 and 2.2, spark is using netty 4.0.x, and gRPC is using 4.1.x, and they are not compatible. In spark 2.3 spark have upgraded to 4.1.x. So we can use gRPC once we support spark 2.3, then we do not dependent on spark EndPoint framework.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182643919
 
    --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.rpc
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.ExecutionContext.Implicits.global
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182643983
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -354,7 +365,9 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
    +    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    +        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    +        dataMapJob != null) {
    --- End diff --
   
    I changed the condition again, please check


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3929/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5153/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182684714
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,14 +93,14 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    -        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
    +        rows.add(reader.getCurrentValue());
           }
         } catch (InterruptedException e) {
           throw new IOException(e);
         } finally {
           reader.close();
         }
    +    LOG.error("finished reading");
    --- End diff --
   
    Why the log is ERROR level?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182689498
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -365,9 +365,10 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    -        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    -        dataMapJob != null) {
    +    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
    +    if (dataMapJob != null &&
    +        distributedCG ||
    --- End diff --
   
    The logical was changed, should be
   
    `  if (dataMapJob != null &&
                (distributedCG ||
            isFgDataMapPruningEnable(job.getConfiguration()) && dataMapLevel == DataMapLevel.FG)) `


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182719725
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,10 +93,13 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        rows.add(reader.getCurrentValue());
    +        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    +        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
    --- End diff --
   
    This commit will lead to JVM crash in my local machine. After revert, it will running success.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182754423
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -31,35 +31,53 @@
     import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
     import org.apache.carbondata.core.util.CarbonProperties;
     
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
    +
     /**
      * Below class will be used to execute the detail query and returns columnar vectors.
      */
     public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
    -  private static ExecutorService executorService;
    +  private static ExecutorService executorService = null;
     
       static {
    +    initThreadPool();
    +  }
    +
    +  private static synchronized void initThreadPool() {
         int nThread;
         try {
           nThread = Integer.parseInt(CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
    +              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                           CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
         } catch (NumberFormatException e) {
           nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
    -      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
    +      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
    +          + "Using the default value " + nThread);
         }
         if (nThread > 0) {
    -      executorService =  Executors.newFixedThreadPool(nThread);
    +      executorService = Executors.newFixedThreadPool(nThread);
    --- End diff --
   
    Use CarbonThreadFactory to assign pool name while creating executor service



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182777721
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,14 +93,14 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    -        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
    +        rows.add(reader.getCurrentValue());
           }
         } catch (InterruptedException e) {
           throw new IOException(e);
         } finally {
           reader.close();
         }
    +    LOG.error("finished reading");
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182778652
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -365,9 +365,10 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    -        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    -        dataMapJob != null) {
    +    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
    +    if (dataMapJob != null &&
    +        distributedCG ||
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182779690
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -31,35 +31,53 @@
     import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
     import org.apache.carbondata.core.util.CarbonProperties;
     
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
    +
     /**
      * Below class will be used to execute the detail query and returns columnar vectors.
      */
     public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
    -  private static ExecutorService executorService;
    +  private static ExecutorService executorService = null;
     
       static {
    +    initThreadPool();
    +  }
    +
    +  private static synchronized void initThreadPool() {
         int nThread;
         try {
           nThread = Integer.parseInt(CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
    +              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                           CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
         } catch (NumberFormatException e) {
           nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
    -      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
    +      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
    +          + "Using the default value " + nThread);
         }
         if (nThread > 0) {
    -      executorService =  Executors.newFixedThreadPool(nThread);
    +      executorService = Executors.newFixedThreadPool(nThread);
    --- End diff --
   
    But CarbonThreadFactory only creates Thread but not ThreadPool, how can I use it?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182928852
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,10 +93,13 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        rows.add(reader.getCurrentValue());
    +        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    +        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
           }
         } catch (InterruptedException e) {
           throw new IOException(e);
    +    } finally {
    --- End diff --
   
    I test, if add the two line, the JVM will crash. If not, it's ok.
    But I don't know the reason.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182930836
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala ---
    @@ -417,7 +417,7 @@ object ConcurrentQueryBenchmark {
        */
       def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
         // run queries on parquet and carbon
    -    runQueries(spark, table1)
    +    //runQueries(spark, table1)
    --- End diff --
   
    We should keep it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4029/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4036/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182963146
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---
    @@ -19,6 +19,7 @@ package org.apache.carbondata.examples.util
     
     import java.io.File
     
    +import org.apache.spark.SparkConf
    --- End diff --
   
    not need


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4039/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5230/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183005257
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---
    @@ -19,6 +19,7 @@ package org.apache.carbondata.examples.util
     
     import java.io.File
     
    +import org.apache.spark.SparkConf
    --- End diff --
   
    fixed


---
1 ... 34567