[GitHub] carbondata pull request #2148: [CARBONDATA-2323][WIP] Distributed search mod...

classic Classic list List threaded Threaded
140 messages Options
1 ... 4567
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4058/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183063006
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -31,35 +31,53 @@
     import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
     import org.apache.carbondata.core.util.CarbonProperties;
     
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
    +
     /**
      * Below class will be used to execute the detail query and returns columnar vectors.
      */
     public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
    -  private static ExecutorService executorService;
    +  private static ExecutorService executorService = null;
     
       static {
    +    initThreadPool();
    +  }
    +
    +  private static synchronized void initThreadPool() {
         int nThread;
         try {
           nThread = Integer.parseInt(CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
    +              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                           CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
         } catch (NumberFormatException e) {
           nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
    -      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
    +      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
    +          + "Using the default value " + nThread);
         }
         if (nThread > 0) {
    -      executorService =  Executors.newFixedThreadPool(nThread);
    +      executorService = Executors.newFixedThreadPool(nThread);
    --- End diff --
   
    yes in Thread pool you can pass thread factory object and assign pool name
        ExecutorService executorService = Executors.newFixedThreadPool(1,
            new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183071058
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +
    +import org.apache.spark.search.SearchRequest;
    +import org.apache.spark.search.SearchResult;
    +import org.apache.spark.search.ShutdownRequest;
    +import org.apache.spark.search.ShutdownResponse;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +public class SearchRequestHandler {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +
    +  public SearchResult handleSearch(SearchRequest request) {
    +    try {
    +      List<CarbonRow> rows = handleRequest(request);
    +      return createSuccessResponse(request, rows);
    +    } catch (IOException | InterruptedException e) {
    +      LOG.error(e);
    +      return createFailureResponse(request, e);
    +    }
    +  }
    +
    +  public ShutdownResponse handleShutdown(ShutdownRequest request) {
    +    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchRequest request)
    +      throws IOException, InterruptedException {
    +    TableInfo tableInfo = request.tableInfo();
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +    CarbonMultiBlockSplit mbSplit = request.split().value();
    +    long limit = request.limit();
    +    long rowCount = 0;
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    +
    +    // In search mode, reader will read multiple blocks by using a thread pool
    +    CarbonRecordReader<CarbonRow> reader =
    +        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
    +    reader.initialize(mbSplit, null);
    +
    +    // read all rows by the reader
    +    List<CarbonRow> rows = new LinkedList<>();
    +    try {
    +      while (reader.nextKeyValue() && rowCount < limit) {
    --- End diff --
   
    We can set default value for limit to -1(for non limit query) and update the code as following
     while (reader.nextKeyValue() && (limit!= -1 || rowCount < limit))
    I think above code will be more readable.
   



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183077175
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---
    @@ -110,6 +149,69 @@ class CarbonSession(@transient val sc: SparkContext,
           }
         }
       }
    +
    +  /**
    +   * If the query is a simple query with filter, we will try to use Search Mode,
    +   * otherwise execute in SparkSQL
    +   */
    +  private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = {
    +    val analyzed = qe.analyzed
    +    analyzed match {
    +      case _@Project(columns, _@Filter(expr, s: SubqueryAlias))
    +        if s.child.isInstanceOf[LogicalRelation] &&
    +           s.child.asInstanceOf[LogicalRelation].relation
    +             .isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        runSearch(analyzed, columns, expr, s.child.asInstanceOf[LogicalRelation])
    +      case gl@GlobalLimit(_, ll@LocalLimit(_, p@Project(columns, _@Filter(expr, s: SubqueryAlias))))
    +        if s.child.isInstanceOf[LogicalRelation] &&
    +           s.child.asInstanceOf[LogicalRelation].relation
    +             .isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        val logicalRelation = s.child.asInstanceOf[LogicalRelation]
    +        runSearch(analyzed, columns, expr, logicalRelation, gl.maxRows, ll.maxRows)
    +      case _ =>
    +        new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
    +    }
    +  }
    +
    +  private var carbonStore: SparkCarbonStore = _
    +
    +  def startSearchMode(): Unit = {
    +    CarbonProperties.enableSearchMode(true)
    +    if (carbonStore == null) {
    +      carbonStore = new SparkCarbonStore(this)
    +      carbonStore.startSearchMode()
    +    }
    +  }
    +
    +  def stopSearchMode(): Unit = {
    +    CarbonProperties.enableSearchMode(false)
    +    if (carbonStore != null) {
    +      carbonStore.stopSearchMode()
    +      carbonStore = null
    +    }
    +  }
    +
    +  private def runSearch(
    +      logicalPlan: LogicalPlan,
    +      columns: Seq[NamedExpression],
    +      expr: Expression,
    +      relation: LogicalRelation,
    +      maxRows: Option[Long] = None,
    +      localMaxRows: Option[Long] = None): DataFrame = {
    +    val rows = carbonStore.search(
    +        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable,
    +        columns.map(_.name).toArray,
    +        if (expr != null) CarbonFilters.transformExpression(expr) else null,
    +        maxRows.getOrElse(Long.MaxValue),
    +        localMaxRows.getOrElse(Long.MaxValue))
    --- End diff --
   
    I have added Long.MaxValue here, I think it is better than using -1


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183077945
 
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +
    +import org.apache.spark.search.SearchRequest;
    +import org.apache.spark.search.SearchResult;
    +import org.apache.spark.search.ShutdownRequest;
    +import org.apache.spark.search.ShutdownResponse;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +public class SearchRequestHandler {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +
    +  public SearchResult handleSearch(SearchRequest request) {
    +    try {
    +      List<CarbonRow> rows = handleRequest(request);
    +      return createSuccessResponse(request, rows);
    +    } catch (IOException | InterruptedException e) {
    +      LOG.error(e);
    +      return createFailureResponse(request, e);
    +    }
    +  }
    +
    +  public ShutdownResponse handleShutdown(ShutdownRequest request) {
    +    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchRequest request)
    +      throws IOException, InterruptedException {
    +    TableInfo tableInfo = request.tableInfo();
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +    CarbonMultiBlockSplit mbSplit = request.split().value();
    +    long limit = request.limit();
    +    long rowCount = 0;
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    +
    +    // In search mode, reader will read multiple blocks by using a thread pool
    +    CarbonRecordReader<CarbonRow> reader =
    +        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
    +    reader.initialize(mbSplit, null);
    +
    +    // read all rows by the reader
    +    List<CarbonRow> rows = new LinkedList<>();
    +    try {
    +      while (reader.nextKeyValue() && rowCount < limit) {
    --- End diff --
   
    I think using Long.MaxValue better than using -1, code is unified


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5248/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4071/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4073/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5254/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4080/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5261/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4085/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5267/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4092/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5275/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2148


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4102/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5282/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4452/



---
1 ... 4567