代码之家  ›  专栏  ›  技术社区  ›  JavaMachine

如何将FixedChannelPool与WebSocket一起使用?

  •  0
  • JavaMachine  · 技术社区  · 7 年前

    FixedChannelPool 最大的“你能得到什么”是他们的 Unit Tests

    我正在使用的客户端代码示例 Source Code

    this post 发现了 bootstrap.handler(new ChannelInitializer<>()) 被覆盖 ChannelPool

    然后我试着移动 ChannelInitializer 阻止到:

    public class SomeConnectionPoolHandler implements ChannelPoolHandler {
    
        private final URI uri = URI.create("ws://some.url:80");
    
        @Override
        public void channelCreated(Channel ch) {
            String protocol = uri.getScheme();
            if (!"ws".equals(protocol)) {
                throw new IllegalArgumentException("Unsupported protocol: " + protocol);
            }
    
            DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
            final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
            final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536), handler);
        }
    
        @Override
        public void channelReleased(Channel ch) {
            //TODO
        }
    
        @Override
        public void channelAcquired(Channel ch) {
            //TODO
        }
    }
    

    现在我的客户端代码看起来像:

    public class Application {
        private final URI uri = URI.create("ws://some.url:80");
        public static void main(String[] args) {
            Channel ch = null;
            try {
                String protocol = uri.getScheme();
                if (!"ws".equals(protocol)) {
                    throw new IllegalArgumentException("Unsupported protocol: " + protocol);
                }
                Bootstrap b = new Bootstrap();
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.remoteAddress(uri.getHost(), uri.getPort());
    
                b.group(group);
                b.channel(NioSocketChannel.class);
                SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();
    
                channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
                ch = channelPool.acquire().sync().get();
                ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if(ch != null) {
                    channelPool.release(ch);
                }
            }
        }
    }
    

    现在我遇到了一个错误:

    java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
        at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
        at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
        at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
        at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
    
    1 回复  |  直到 7 年前
        1
  •  0
  •   JavaMachine    7 年前

    好的,我找到了问题的解决方案。

    Channel

    在里面 handler source code 您可以看到,它有方法获取 ChannelFuture

    public ChannelFuture handshakeFuture() {
            return handshakeFuture;
        }
    

    现在,收购后 频道 从…起 FixedChannelPool ,我正在调用方法(返回握手是否完成):

    private boolean isConnectionReady(Channel ch) {
            try {
                WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
                return handler.handshakeFuture().sync().isSuccess();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    

    还为添加了名称 WebSocketClientHandler channelCreated(Channel ch)

    DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
    final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
    final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536));
    pipeline.addLast("handler", handler);
    

    我知道这段代码现在看起来很糟糕,应该重构。