Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/2148 @ravipesala No, it is not possible in this version now. In next version, we need to change it. In spark 2.1 and 2.2, spark is using netty 4.0.x, and gRPC is using 4.1.x, and they are not compatible. In spark 2.3 spark have upgraded to 4.1.x. So we can use gRPC once we support spark 2.3, then we do not dependent on spark EndPoint framework. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182643919 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.io.IOException +import java.net.InetAddress +import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext.Implicits.global --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182643983 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -354,7 +365,9 @@ protected Expression getFilterPredicates(Configuration configuration) { DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); List<ExtendedBlocklet> prunedBlocklets; - if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { + if (isFgDataMapPruningEnable(job.getConfiguration()) && + (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) && + dataMapJob != null) { --- End diff -- I changed the condition again, please check --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3929/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5153/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182684714 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -93,14 +93,14 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) { List<CarbonRow> rows = new LinkedList<>(); try { while (reader.nextKeyValue()) { - // copy the data as the reader may reuse the same buffer, if unsafe is enabled - rows.add(new CarbonRow(reader.getCurrentValue().getData())); + rows.add(reader.getCurrentValue()); } } catch (InterruptedException e) { throw new IOException(e); } finally { reader.close(); } + LOG.error("finished reading"); --- End diff -- Why the log is ERROR level? --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182689498 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -365,9 +365,10 @@ protected Expression getFilterPredicates(Configuration configuration) { DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); List<ExtendedBlocklet> prunedBlocklets; - if (isFgDataMapPruningEnable(job.getConfiguration()) && - (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) && - dataMapJob != null) { + DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType(); + if (dataMapJob != null && + distributedCG || --- End diff -- The logical was changed, should be ` if (dataMapJob != null && (distributedCG || isFgDataMapPruningEnable(job.getConfiguration()) && dataMapLevel == DataMapLevel.FG)) ` --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182719725 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -93,10 +93,13 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) { List<CarbonRow> rows = new LinkedList<>(); try { while (reader.nextKeyValue()) { - rows.add(reader.getCurrentValue()); + // copy the data as the reader may reuse the same buffer, if unsafe is enabled + rows.add(new CarbonRow(reader.getCurrentValue().getData())); --- End diff -- This commit will lead to JVM crash in my local machine. After revert, it will running success. --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182754423 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java --- @@ -31,35 +31,53 @@ import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; import org.apache.carbondata.core.util.CarbonProperties; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; + /** * Below class will be used to execute the detail query and returns columnar vectors. */ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { private static final LogService LOGGER = LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); - private static ExecutorService executorService; + private static ExecutorService executorService = null; static { + initThreadPool(); + } + + private static synchronized void initThreadPool() { int nThread; try { nThread = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD, + .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT)); } catch (NumberFormatException e) { nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT); - LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); + LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. " + + "Using the default value " + nThread); } if (nThread > 0) { - executorService = Executors.newFixedThreadPool(nThread); + executorService = Executors.newFixedThreadPool(nThread); --- End diff -- Use CarbonThreadFactory to assign pool name while creating executor service --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182777721 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -93,14 +93,14 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) { List<CarbonRow> rows = new LinkedList<>(); try { while (reader.nextKeyValue()) { - // copy the data as the reader may reuse the same buffer, if unsafe is enabled - rows.add(new CarbonRow(reader.getCurrentValue().getData())); + rows.add(reader.getCurrentValue()); } } catch (InterruptedException e) { throw new IOException(e); } finally { reader.close(); } + LOG.error("finished reading"); --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182778652 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -365,9 +365,10 @@ protected Expression getFilterPredicates(Configuration configuration) { DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); List<ExtendedBlocklet> prunedBlocklets; - if (isFgDataMapPruningEnable(job.getConfiguration()) && - (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) && - dataMapJob != null) { + DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType(); + if (dataMapJob != null && + distributedCG || --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182779690 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java --- @@ -31,35 +31,53 @@ import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; import org.apache.carbondata.core.util.CarbonProperties; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; + /** * Below class will be used to execute the detail query and returns columnar vectors. */ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { private static final LogService LOGGER = LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); - private static ExecutorService executorService; + private static ExecutorService executorService = null; static { + initThreadPool(); + } + + private static synchronized void initThreadPool() { int nThread; try { nThread = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD, + .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT)); } catch (NumberFormatException e) { nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT); - LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); + LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. " + + "Using the default value " + nThread); } if (nThread > 0) { - executorService = Executors.newFixedThreadPool(nThread); + executorService = Executors.newFixedThreadPool(nThread); --- End diff -- But CarbonThreadFactory only creates Thread but not ThreadPool, how can I use it? --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182928852 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -93,10 +93,13 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) { List<CarbonRow> rows = new LinkedList<>(); try { while (reader.nextKeyValue()) { - rows.add(reader.getCurrentValue()); + // copy the data as the reader may reuse the same buffer, if unsafe is enabled + rows.add(new CarbonRow(reader.getCurrentValue().getData())); } } catch (InterruptedException e) { throw new IOException(e); + } finally { --- End diff -- I test, if add the two line, the JVM will crash. If not, it's ok. But I don't know the reason. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182930836 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala --- @@ -417,7 +417,7 @@ object ConcurrentQueryBenchmark { */ def runTest(spark: SparkSession, table1: String, table2: String): Unit = { // run queries on parquet and carbon - runQueries(spark, table1) + //runQueries(spark, table1) --- End diff -- We should keep it. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4029/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4036/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r182963146 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala --- @@ -19,6 +19,7 @@ package org.apache.carbondata.examples.util import java.io.File +import org.apache.spark.SparkConf --- End diff -- not need --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4039/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2148 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5230/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r183005257 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala --- @@ -19,6 +19,7 @@ package org.apache.carbondata.examples.util import java.io.File +import org.apache.spark.SparkConf --- End diff -- fixed --- |
Free forum by Nabble | Edit this page |