GitHub user ravipesala opened a pull request:
https://github.com/apache/incubator-carbondata/pull/605 [CARBONDATA-715] Optimize Single-Pass data load flow 1. Upgrade to latest netty-4.1.8 2. Optimize the serialization of key for passing in network. 3. Launch individual dictionary client for each loading thread. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata single-pass-opt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-carbondata/pull/605.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #605 ---- commit 049378adebd5ed24a2dd2d2c4d91e08d99a2c0bc Author: ravipesala <[hidden email]> Date: 2017-02-21T01:15:09Z Optimized and upgraded Dictionary Server commit 8c9d84eb0eef4b4a17c46b7e40fb8e408ea4d3d0 Author: ravipesala <[hidden email]> Date: 2017-02-21T05:29:37Z Added multi dictionary clients for each thread ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/605 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/921/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/605 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/922/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/605 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/924/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102358591 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java --- @@ -44,7 +40,7 @@ private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler(); - private ClientBootstrap clientBootstrap; + private NioEventLoopGroup workerGroup; --- End diff -- This can be a local variable in `startClient` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102358957 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java --- @@ -86,7 +78,11 @@ public DictionaryKey getDictionary(DictionaryKey key) { * shutdown dictionary client */ public void shutDown() { - clientBootstrap.releaseExternalResources(); - clientBootstrap.shutdown(); + workerGroup.shutdownGracefully(); --- End diff -- How about do like following: ``` if (clientBootstrap != null) { if (clientBootstrap.group() != null) { clientBootstrap.group().shutdownGracefully(); } clientBootstrap = null; } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102359538 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java --- @@ -16,57 +16,59 @@ */ package org.apache.carbondata.core.dictionary.client; -import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey; -import org.apache.carbondata.core.dictionary.generator.key.KryoRegister; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; /** * Client handler to get data. */ -public class DictionaryClientHandler extends SimpleChannelHandler { +public class DictionaryClientHandler extends ChannelInboundHandlerAdapter { private static final LogService LOGGER = LogServiceFactory.getLogService(DictionaryClientHandler.class.getName()); - final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>(); + final BlockingQueue<DictionaryKey> dictKeyQueue = new LinkedBlockingQueue<>(); --- End diff -- suggest to change to responseMsgQueue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102360297 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java --- @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws */ public DictionaryKey getDictionary(DictionaryKey key) { DictionaryKey dictionaryKey; - BlockingQueue<DictionaryKey> dictKeyQueue = null; try { - synchronized (lock) { - dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo()); - if (dictKeyQueue == null) { - dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>(); - dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue); - } - } - byte[] serialize = KryoRegister.serialize(key); - ctx.getChannel().write(serialize); + ByteBuf buffer = ctx.alloc().buffer(); + key.writeData(buffer); + ctx.writeAndFlush(buffer); --- End diff -- better to handle failure case by `addListener` ``` ChannelFuture f = channel.writeAndFlush(request); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { LOGGER.info("sent success"); } else { LOGGER.info("sent failed"); } } }); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102361729 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java --- @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws */ public DictionaryKey getDictionary(DictionaryKey key) { DictionaryKey dictionaryKey; - BlockingQueue<DictionaryKey> dictKeyQueue = null; try { - synchronized (lock) { - dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo()); - if (dictKeyQueue == null) { - dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>(); - dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue); - } - } - byte[] serialize = KryoRegister.serialize(key); - ctx.getChannel().write(serialize); + ByteBuf buffer = ctx.alloc().buffer(); + key.writeData(buffer); + ctx.writeAndFlush(buffer); } catch (Exception e) { - LOGGER.error("Error while send request to server " + e.getMessage()); - ctx.getChannel().close(); + LOGGER.error(e, "Error while send request to server " + e.getMessage()); + ctx.close(); } boolean interrupted = false; try { --- End diff -- There is no timeout control here. How about using SettingFuture from Guava libary, like ``` final SettableFuture<Integer> dictionaryKey = SettableFuture.create(); ResponseCallback callback = new ResponseCallback() { @Override public void invoke(NettyMessage response) { if (response instanceof DictGenResponseMessage) { DictGenResponseMessage resp = (DictGenResponseMessage) response; dictionaryKey.set(resp.getDictionaryKey()); } } }; clientHandler.addRequest(request.id(), callback); ChannelFuture f = channel.writeAndFlush(request); try { return dictionaryKey.get(10, TimeUnit.SECONDS); } catch (InterruptedException | TimeoutException | ExecutionException e) { LOGGER.error("exception while generating dictionary key from Dictionary Service: " + e.getMessage()); throw e; } ``` This needs to set a callback in the handler and invoke the callback in `channelRead0` in the handler. Then the blockingQueue is not needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363218 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java --- @@ -16,12 +16,12 @@ */ package org.apache.carbondata.core.dictionary.generator.key; -import java.io.Serializable; +import io.netty.buffer.ByteBuf; /** * Dictionary key to generate dictionary */ -public class DictionaryKey implements Serializable { +public class DictionaryKey { --- End diff -- How about separate request and response so that response will be lighter. Or leave it now and I can do it in another PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363432 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKeyType.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.dictionary.generator.key; + +/** + * Dictionary key types. + */ +public enum DictionaryKeyType { --- End diff -- This is the message type, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363743 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java --- @@ -42,38 +38,40 @@ private static final LogService LOGGER = LogServiceFactory.getLogService(DictionaryServer.class.getName()); - private ServerBootstrap bootstrap; - private DictionaryServerHandler dictionaryServerHandler; + private EventLoopGroup boss; + private EventLoopGroup worker; + /** * start dictionary server * * @param port * @throws Exception */ public void startServer(int port) { - bootstrap = new ServerBootstrap(); dictionaryServerHandler = new DictionaryServerHandler(); + boss = new NioEventLoopGroup(); + worker = new NioEventLoopGroup(); + // Configure the server. + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(boss, worker); + bootstrap.channel(NioServerSocketChannel.class); - ExecutorService boss = Executors.newCachedThreadPool(); - ExecutorService worker = Executors.newCachedThreadPool(); - - bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); + bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { + @Override public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler); + } + }); + bootstrap.bind(port).sync(); --- End diff -- I think SO_KEEPALIVE is needed, and please log the port it is listening: ``` bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // bind server address bindFuture = bootstrap.bind(new InetSocketAddress(port)).syncUninterruptibly(); localAddr = (InetSocketAddress) bindFuture.channel().localAddress(); long endTime = System.currentTimeMillis(); LOGGER.info("Dictionary Server started! Time spent " + (endTime - startTime) + ", listening on " + localAddr); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363987 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java --- @@ -83,9 +81,12 @@ public ChannelPipeline getPipeline() throws Exception { */ public void shutdown() throws Exception { DictionaryKey key = new DictionaryKey(); - key.setType("WRITE_DICTIONARY"); + key.setType(DictionaryKeyType.WRITE_DICTIONARY); dictionaryServerHandler.processMessage(key); - bootstrap.releaseExternalResources(); - bootstrap.shutdown(); + worker.shutdownGracefully(); --- End diff -- How about using bootstrap to shut it down? ``` long startTime = System.currentTimeMillis(); if (bindFuture != null) { bindFuture.channel().close().awaitUninterruptibly(); bindFuture = null; } if (bootstrap != null) { if (bootstrap.group() != null) { bootstrap.group().shutdownGracefully(); } bootstrap = null; } long endTime = System.currentTimeMillis(); LOGGER.info("Dictionary Server shutdown! Time spent " + (endTime - startTime)); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102366409 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java --- @@ -36,17 +36,76 @@ /** * message data */ - private Object data; + private String data; /** - * message type + * Dictionary Value */ - private String type; + private int dictionaryValue = -1; --- End diff -- use `CarbonCommonConstants.INVALID_SURROGATE_KEY` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on the issue:
https://github.com/apache/incubator-carbondata/pull/605 @ravipesala one query : whether add related test cases for "Single-Pass data load" feature ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682192 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java --- @@ -44,7 +40,7 @@ private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler(); - private ClientBootstrap clientBootstrap; + private NioEventLoopGroup workerGroup; --- End diff -- I need to have `NioEventLoopGroup` to shutdown the server, cannot shutdown from bootstrap --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682365 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java --- @@ -86,7 +78,11 @@ public DictionaryKey getDictionary(DictionaryKey key) { * shutdown dictionary client */ public void shutDown() { - clientBootstrap.releaseExternalResources(); - clientBootstrap.shutdown(); + workerGroup.shutdownGracefully(); --- End diff -- Method `clientBootstrap.group()` do not return `NioEventLoopGroup` , so we cannot shutdown this way --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682394 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java --- @@ -16,57 +16,59 @@ */ package org.apache.carbondata.core.dictionary.client; -import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey; -import org.apache.carbondata.core.dictionary.generator.key.KryoRegister; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; /** * Client handler to get data. */ -public class DictionaryClientHandler extends SimpleChannelHandler { +public class DictionaryClientHandler extends ChannelInboundHandlerAdapter { private static final LogService LOGGER = LogServiceFactory.getLogService(DictionaryClientHandler.class.getName()); - final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>(); + final BlockingQueue<DictionaryKey> dictKeyQueue = new LinkedBlockingQueue<>(); --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682570 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java --- @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws */ public DictionaryKey getDictionary(DictionaryKey key) { DictionaryKey dictionaryKey; - BlockingQueue<DictionaryKey> dictKeyQueue = null; try { - synchronized (lock) { - dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo()); - if (dictKeyQueue == null) { - dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>(); - dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue); - } - } - byte[] serialize = KryoRegister.serialize(key); - ctx.getChannel().write(serialize); + ByteBuf buffer = ctx.alloc().buffer(); + key.writeData(buffer); + ctx.writeAndFlush(buffer); --- End diff -- Ok, handled --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/605#discussion_r102692467 --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java --- @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws */ public DictionaryKey getDictionary(DictionaryKey key) { DictionaryKey dictionaryKey; - BlockingQueue<DictionaryKey> dictKeyQueue = null; try { - synchronized (lock) { - dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo()); - if (dictKeyQueue == null) { - dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>(); - dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue); - } - } - byte[] serialize = KryoRegister.serialize(key); - ctx.getChannel().write(serialize); + ByteBuf buffer = ctx.alloc().buffer(); + key.writeData(buffer); + ctx.writeAndFlush(buffer); } catch (Exception e) { - LOGGER.error("Error while send request to server " + e.getMessage()); - ctx.getChannel().close(); + LOGGER.error(e, "Error while send request to server " + e.getMessage()); + ctx.close(); } boolean interrupted = false; try { --- End diff -- Ok, timeout is handled in current code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |