Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4058/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/2148 retest this please --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r183063006 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java --- @@ -31,35 +31,53 @@ import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; import org.apache.carbondata.core.util.CarbonProperties; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; + /** * Below class will be used to execute the detail query and returns columnar vectors. */ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { private static final LogService LOGGER = LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); - private static ExecutorService executorService; + private static ExecutorService executorService = null; static { + initThreadPool(); + } + + private static synchronized void initThreadPool() { int nThread; try { nThread = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD, + .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT)); } catch (NumberFormatException e) { nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT); - LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); + LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. " + + "Using the default value " + nThread); } if (nThread > 0) { - executorService = Executors.newFixedThreadPool(nThread); + executorService = Executors.newFixedThreadPool(nThread); --- End diff -- yes in Thread pool you can pass thread factory object and assign pool name ExecutorService executorService = Executors.newFixedThreadPool(1, new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));; --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r183071058 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.worker; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; + +import org.apache.spark.search.SearchRequest; +import org.apache.spark.search.SearchResult; +import org.apache.spark.search.ShutdownRequest; +import org.apache.spark.search.ShutdownResponse; + +/** + * Thread runnable for handling SearchRequest from master. + */ +@InterfaceAudience.Internal +public class SearchRequestHandler { + + private static final LogService LOG = + LogServiceFactory.getLogService(SearchRequestHandler.class.getName()); + + public SearchResult handleSearch(SearchRequest request) { + try { + List<CarbonRow> rows = handleRequest(request); + return createSuccessResponse(request, rows); + } catch (IOException | InterruptedException e) { + LOG.error(e); + return createFailureResponse(request, e); + } + } + + public ShutdownResponse handleShutdown(ShutdownRequest request) { + return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); + } + + /** + * Builds {@link QueryModel} and read data from files + */ + private List<CarbonRow> handleRequest(SearchRequest request) + throws IOException, InterruptedException { + TableInfo tableInfo = request.tableInfo(); + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); + QueryModel queryModel = createQueryModel(table, request); + CarbonMultiBlockSplit mbSplit = request.split().value(); + long limit = request.limit(); + long rowCount = 0; + + // If there is FGDataMap, prune the split by applying FGDataMap + queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit); + + // In search mode, reader will read multiple blocks by using a thread pool + CarbonRecordReader<CarbonRow> reader = + new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport()); + reader.initialize(mbSplit, null); + + // read all rows by the reader + List<CarbonRow> rows = new LinkedList<>(); + try { + while (reader.nextKeyValue() && rowCount < limit) { --- End diff -- We can set default value for limit to -1(for non limit query) and update the code as following while (reader.nextKeyValue() && (limit!= -1 || rowCount < limit)) I think above code will be more readable. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r183077175 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -110,6 +149,69 @@ class CarbonSession(@transient val sc: SparkContext, } } } + + /** + * If the query is a simple query with filter, we will try to use Search Mode, + * otherwise execute in SparkSQL + */ + private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = { + val analyzed = qe.analyzed + analyzed match { + case _@Project(columns, _@Filter(expr, s: SubqueryAlias)) + if s.child.isInstanceOf[LogicalRelation] && + s.child.asInstanceOf[LogicalRelation].relation + .isInstanceOf[CarbonDatasourceHadoopRelation] => + runSearch(analyzed, columns, expr, s.child.asInstanceOf[LogicalRelation]) + case gl@GlobalLimit(_, ll@LocalLimit(_, p@Project(columns, _@Filter(expr, s: SubqueryAlias)))) + if s.child.isInstanceOf[LogicalRelation] && + s.child.asInstanceOf[LogicalRelation].relation + .isInstanceOf[CarbonDatasourceHadoopRelation] => + val logicalRelation = s.child.asInstanceOf[LogicalRelation] + runSearch(analyzed, columns, expr, logicalRelation, gl.maxRows, ll.maxRows) + case _ => + new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema)) + } + } + + private var carbonStore: SparkCarbonStore = _ + + def startSearchMode(): Unit = { + CarbonProperties.enableSearchMode(true) + if (carbonStore == null) { + carbonStore = new SparkCarbonStore(this) + carbonStore.startSearchMode() + } + } + + def stopSearchMode(): Unit = { + CarbonProperties.enableSearchMode(false) + if (carbonStore != null) { + carbonStore.stopSearchMode() + carbonStore = null + } + } + + private def runSearch( + logicalPlan: LogicalPlan, + columns: Seq[NamedExpression], + expr: Expression, + relation: LogicalRelation, + maxRows: Option[Long] = None, + localMaxRows: Option[Long] = None): DataFrame = { + val rows = carbonStore.search( + relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable, + columns.map(_.name).toArray, + if (expr != null) CarbonFilters.transformExpression(expr) else null, + maxRows.getOrElse(Long.MaxValue), + localMaxRows.getOrElse(Long.MaxValue)) --- End diff -- I have added Long.MaxValue here, I think it is better than using -1 --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r183077945 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.worker; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; + +import org.apache.spark.search.SearchRequest; +import org.apache.spark.search.SearchResult; +import org.apache.spark.search.ShutdownRequest; +import org.apache.spark.search.ShutdownResponse; + +/** + * Thread runnable for handling SearchRequest from master. + */ +@InterfaceAudience.Internal +public class SearchRequestHandler { + + private static final LogService LOG = + LogServiceFactory.getLogService(SearchRequestHandler.class.getName()); + + public SearchResult handleSearch(SearchRequest request) { + try { + List<CarbonRow> rows = handleRequest(request); + return createSuccessResponse(request, rows); + } catch (IOException | InterruptedException e) { + LOG.error(e); + return createFailureResponse(request, e); + } + } + + public ShutdownResponse handleShutdown(ShutdownRequest request) { + return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); + } + + /** + * Builds {@link QueryModel} and read data from files + */ + private List<CarbonRow> handleRequest(SearchRequest request) + throws IOException, InterruptedException { + TableInfo tableInfo = request.tableInfo(); + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); + QueryModel queryModel = createQueryModel(table, request); + CarbonMultiBlockSplit mbSplit = request.split().value(); + long limit = request.limit(); + long rowCount = 0; + + // If there is FGDataMap, prune the split by applying FGDataMap + queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit); + + // In search mode, reader will read multiple blocks by using a thread pool + CarbonRecordReader<CarbonRow> reader = + new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport()); + reader.initialize(mbSplit, null); + + // read all rows by the reader + List<CarbonRow> rows = new LinkedList<>(); + try { + while (reader.nextKeyValue() && rowCount < limit) { --- End diff -- I think using Long.MaxValue better than using -1, code is unified --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5248/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4071/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4073/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5254/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4080/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5261/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4085/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5267/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4092/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5275/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4102/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5282/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2148 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4452/ --- |
Free forum by Nabble | Edit this page |