[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

classic Classic list List threaded Threaded
35 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
GitHub user ravipesala opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/605

    [CARBONDATA-715] Optimize Single-Pass data load flow

    1. Upgrade to latest netty-4.1.8
    2. Optimize the serialization of key for passing in network.
    3. Launch individual dictionary client for each loading thread.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata single-pass-opt

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/605.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #605
   
----
commit 049378adebd5ed24a2dd2d2c4d91e08d99a2c0bc
Author: ravipesala <[hidden email]>
Date:   2017-02-21T01:15:09Z

    Optimized and upgraded Dictionary Server

commit 8c9d84eb0eef4b4a17c46b7e40fb8e408ea4d3d0
Author: ravipesala <[hidden email]>
Date:   2017-02-21T05:29:37Z

    Added multi dictionary clients for each thread

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #605: [CARBONDATA-715] Optimize Single-Pass data ...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/605
 
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/921/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #605: [CARBONDATA-715] Optimize Single-Pass data ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/605
 
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/922/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #605: [CARBONDATA-715] Optimize Single-Pass data ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/605
 
    Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/924/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102358591
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java ---
    @@ -44,7 +40,7 @@
     
       private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();
     
    -  private ClientBootstrap clientBootstrap;
    +  private NioEventLoopGroup workerGroup;
    --- End diff --
   
    This can be a local variable in `startClient`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102358957
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java ---
    @@ -86,7 +78,11 @@ public DictionaryKey getDictionary(DictionaryKey key) {
        * shutdown dictionary client
        */
       public void shutDown() {
    -    clientBootstrap.releaseExternalResources();
    -    clientBootstrap.shutdown();
    +    workerGroup.shutdownGracefully();
    --- End diff --
   
    How about do like following:
    ```
    if (clientBootstrap != null) {
          if (clientBootstrap.group() != null) {
            clientBootstrap.group().shutdownGracefully();
          }
          clientBootstrap = null;
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102359538
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java ---
    @@ -16,57 +16,59 @@
      */
     package org.apache.carbondata.core.dictionary.client;
     
    -import java.util.Map;
     import java.util.concurrent.BlockingQueue;
    -import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.LinkedBlockingQueue;
     
     import org.apache.carbondata.common.logging.LogService;
     import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
    -import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
     
    -import org.jboss.netty.channel.ChannelHandlerContext;
    -import org.jboss.netty.channel.ChannelStateEvent;
    -import org.jboss.netty.channel.ExceptionEvent;
    -import org.jboss.netty.channel.MessageEvent;
    -import org.jboss.netty.channel.SimpleChannelHandler;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
     
     /**
      * Client handler to get data.
      */
    -public class DictionaryClientHandler extends SimpleChannelHandler {
    +public class DictionaryClientHandler extends ChannelInboundHandlerAdapter {
     
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
     
    -  final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>();
    +  final BlockingQueue<DictionaryKey> dictKeyQueue = new LinkedBlockingQueue<>();
    --- End diff --
   
    suggest to change to responseMsgQueue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102360297
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java ---
    @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
        */
       public DictionaryKey getDictionary(DictionaryKey key) {
         DictionaryKey dictionaryKey;
    -    BlockingQueue<DictionaryKey> dictKeyQueue = null;
         try {
    -      synchronized (lock) {
    -        dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
    -        if (dictKeyQueue == null) {
    -          dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
    -          dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
    -        }
    -      }
    -      byte[] serialize = KryoRegister.serialize(key);
    -      ctx.getChannel().write(serialize);
    +      ByteBuf buffer = ctx.alloc().buffer();
    +      key.writeData(buffer);
    +      ctx.writeAndFlush(buffer);
    --- End diff --
   
    better to handle failure case by `addListener`
    ```
        ChannelFuture f = channel.writeAndFlush(request);
        f.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
              LOGGER.info("sent success");
            } else {
              LOGGER.info("sent failed");
            }
          }
        });
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102361729
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java ---
    @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
        */
       public DictionaryKey getDictionary(DictionaryKey key) {
         DictionaryKey dictionaryKey;
    -    BlockingQueue<DictionaryKey> dictKeyQueue = null;
         try {
    -      synchronized (lock) {
    -        dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
    -        if (dictKeyQueue == null) {
    -          dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
    -          dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
    -        }
    -      }
    -      byte[] serialize = KryoRegister.serialize(key);
    -      ctx.getChannel().write(serialize);
    +      ByteBuf buffer = ctx.alloc().buffer();
    +      key.writeData(buffer);
    +      ctx.writeAndFlush(buffer);
         } catch (Exception e) {
    -      LOGGER.error("Error while send request to server " + e.getMessage());
    -      ctx.getChannel().close();
    +      LOGGER.error(e, "Error while send request to server " + e.getMessage());
    +      ctx.close();
         }
         boolean interrupted = false;
         try {
    --- End diff --
   
    There is no timeout control here. How about using SettingFuture from Guava libary, like
    ```
    final SettableFuture<Integer> dictionaryKey = SettableFuture.create();
        ResponseCallback callback = new ResponseCallback() {
          @Override
          public void invoke(NettyMessage response) {
            if (response instanceof DictGenResponseMessage) {
              DictGenResponseMessage resp = (DictGenResponseMessage) response;
              dictionaryKey.set(resp.getDictionaryKey());
            }
          }
        };
        clientHandler.addRequest(request.id(), callback);
        ChannelFuture f = channel.writeAndFlush(request);
        try {
          return dictionaryKey.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException | TimeoutException | ExecutionException e) {
          LOGGER.error("exception while generating dictionary key from Dictionary Service: " +
              e.getMessage());
          throw e;
        }
    ```
    This needs to set a callback in the handler and invoke the callback in `channelRead0` in the handler. Then the blockingQueue is not needed



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363218
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java ---
    @@ -16,12 +16,12 @@
      */
     package org.apache.carbondata.core.dictionary.generator.key;
     
    -import java.io.Serializable;
    +import io.netty.buffer.ByteBuf;
     
     /**
      * Dictionary key to generate dictionary
      */
    -public class DictionaryKey implements Serializable {
    +public class DictionaryKey {
    --- End diff --
   
    How about separate request and response so that response will be lighter.
    Or leave it now and I can do it in another PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363432
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKeyType.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.dictionary.generator.key;
    +
    +/**
    + * Dictionary key types.
    + */
    +public enum DictionaryKeyType {
    --- End diff --
   
    This is the message type, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363743
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java ---
    @@ -42,38 +38,40 @@
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(DictionaryServer.class.getName());
     
    -  private ServerBootstrap bootstrap;
    -
       private DictionaryServerHandler dictionaryServerHandler;
     
    +  private EventLoopGroup boss;
    +  private EventLoopGroup worker;
    +
       /**
        * start dictionary server
        *
        * @param port
        * @throws Exception
        */
       public void startServer(int port) {
    -    bootstrap = new ServerBootstrap();
         dictionaryServerHandler = new DictionaryServerHandler();
    +    boss = new NioEventLoopGroup();
    +    worker = new NioEventLoopGroup();
    +    // Configure the server.
    +    try {
    +      ServerBootstrap bootstrap = new ServerBootstrap();
    +      bootstrap.group(boss, worker);
    +      bootstrap.channel(NioServerSocketChannel.class);
     
    -    ExecutorService boss = Executors.newCachedThreadPool();
    -    ExecutorService worker = Executors.newCachedThreadPool();
    -
    -    bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
    +      bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    +        @Override public void initChannel(SocketChannel ch) throws Exception {
    +          ChannelPipeline pipeline = ch.pipeline();
    +          pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
    +        }
    +      });
    +      bootstrap.bind(port).sync();
    --- End diff --
   
    I think SO_KEEPALIVE is needed, and please log the port it is listening:
    ```
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
   
        // bind server address
        bindFuture = bootstrap.bind(new InetSocketAddress(port)).syncUninterruptibly();
        localAddr = (InetSocketAddress) bindFuture.channel().localAddress();
   
        long endTime = System.currentTimeMillis();
        LOGGER.info("Dictionary Server started! Time spent " + (endTime - startTime) +
            ", listening on " + localAddr);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102363987
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java ---
    @@ -83,9 +81,12 @@ public ChannelPipeline getPipeline() throws Exception {
        */
       public void shutdown() throws Exception {
         DictionaryKey key = new DictionaryKey();
    -    key.setType("WRITE_DICTIONARY");
    +    key.setType(DictionaryKeyType.WRITE_DICTIONARY);
         dictionaryServerHandler.processMessage(key);
    -    bootstrap.releaseExternalResources();
    -    bootstrap.shutdown();
    +    worker.shutdownGracefully();
    --- End diff --
   
    How about using bootstrap to shut it down?
    ```
        long startTime = System.currentTimeMillis();
        if (bindFuture != null) {
          bindFuture.channel().close().awaitUninterruptibly();
          bindFuture = null;
        }
   
        if (bootstrap != null) {
          if (bootstrap.group() != null) {
            bootstrap.group().shutdownGracefully();
          }
          bootstrap = null;
        }
        long endTime = System.currentTimeMillis();
        LOGGER.info("Dictionary Server shutdown! Time spent " + (endTime - startTime));
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102366409
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java ---
    @@ -36,17 +36,76 @@
       /**
        * message data
        */
    -  private Object data;
    +  private String data;
     
       /**
    -   * message type
    +   * Dictionary Value
        */
    -  private String type;
    +  private int dictionaryValue = -1;
    --- End diff --
   
    use `CarbonCommonConstants.INVALID_SURROGATE_KEY`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata issue #605: [CARBONDATA-715] Optimize Single-Pass data ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on the issue:

    https://github.com/apache/incubator-carbondata/pull/605
 
    @ravipesala one query :  whether add related test cases for "Single-Pass data load" feature ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682192
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java ---
    @@ -44,7 +40,7 @@
     
       private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();
     
    -  private ClientBootstrap clientBootstrap;
    +  private NioEventLoopGroup workerGroup;
    --- End diff --
   
    I need to have `NioEventLoopGroup` to shutdown the server, cannot shutdown from bootstrap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682365
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java ---
    @@ -86,7 +78,11 @@ public DictionaryKey getDictionary(DictionaryKey key) {
        * shutdown dictionary client
        */
       public void shutDown() {
    -    clientBootstrap.releaseExternalResources();
    -    clientBootstrap.shutdown();
    +    workerGroup.shutdownGracefully();
    --- End diff --
   
    Method `clientBootstrap.group()` do not return `NioEventLoopGroup` , so we cannot shutdown this way


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682394
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java ---
    @@ -16,57 +16,59 @@
      */
     package org.apache.carbondata.core.dictionary.client;
     
    -import java.util.Map;
     import java.util.concurrent.BlockingQueue;
    -import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.LinkedBlockingQueue;
     
     import org.apache.carbondata.common.logging.LogService;
     import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
    -import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
     
    -import org.jboss.netty.channel.ChannelHandlerContext;
    -import org.jboss.netty.channel.ChannelStateEvent;
    -import org.jboss.netty.channel.ExceptionEvent;
    -import org.jboss.netty.channel.MessageEvent;
    -import org.jboss.netty.channel.SimpleChannelHandler;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
     
     /**
      * Client handler to get data.
      */
    -public class DictionaryClientHandler extends SimpleChannelHandler {
    +public class DictionaryClientHandler extends ChannelInboundHandlerAdapter {
     
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
     
    -  final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>();
    +  final BlockingQueue<DictionaryKey> dictKeyQueue = new LinkedBlockingQueue<>();
    --- End diff --
   
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102682570
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java ---
    @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
        */
       public DictionaryKey getDictionary(DictionaryKey key) {
         DictionaryKey dictionaryKey;
    -    BlockingQueue<DictionaryKey> dictKeyQueue = null;
         try {
    -      synchronized (lock) {
    -        dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
    -        if (dictKeyQueue == null) {
    -          dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
    -          dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
    -        }
    -      }
    -      byte[] serialize = KryoRegister.serialize(key);
    -      ctx.getChannel().write(serialize);
    +      ByteBuf buffer = ctx.alloc().buffer();
    +      key.writeData(buffer);
    +      ctx.writeAndFlush(buffer);
    --- End diff --
   
    Ok, handled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-carbondata pull request #605: [CARBONDATA-715] Optimize Single-Pas...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/605#discussion_r102692467
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java ---
    @@ -77,20 +79,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
        */
       public DictionaryKey getDictionary(DictionaryKey key) {
         DictionaryKey dictionaryKey;
    -    BlockingQueue<DictionaryKey> dictKeyQueue = null;
         try {
    -      synchronized (lock) {
    -        dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
    -        if (dictKeyQueue == null) {
    -          dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
    -          dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
    -        }
    -      }
    -      byte[] serialize = KryoRegister.serialize(key);
    -      ctx.getChannel().write(serialize);
    +      ByteBuf buffer = ctx.alloc().buffer();
    +      key.writeData(buffer);
    +      ctx.writeAndFlush(buffer);
         } catch (Exception e) {
    -      LOGGER.error("Error while send request to server " + e.getMessage());
    -      ctx.getChannel().close();
    +      LOGGER.error(e, "Error while send request to server " + e.getMessage());
    +      ctx.close();
         }
         boolean interrupted = false;
         try {
    --- End diff --
   
    Ok, timeout is handled in current code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
12