代码之家  ›  专栏  ›  技术社区  ›  goshi

添加解码器/编码器时网络管道破裂

  •  2
  • goshi  · 技术社区  · 7 年前

    接下来我添加了代码。

    public final class HexDumpProxy {
    
    public static void main(String[] args) throws Exception {
    
        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HexDumpProxyInitializer("127.0.0.1", 9000))
             .childOption(ChannelOption.AUTO_READ, false)
             .bind(8000).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            }
        }
    }
    

    public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
    
    private final String remoteHost;
    private final int remotePort;
    
    public HexDumpProxyInitializer (String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast("frameDecoder", new LineBasedFrameDecoder(80));
        ch.pipeline().addLast("decoder", new StringDecoder());
        ch.pipeline().addLast("encoder", new StringEncoder());
        //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
        ch.pipeline().addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
        }
    
    }
    

    前端处理程序类:

    public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
    
    private final String remoteHost;
    private final int remotePort;
    
    private Channel serverChannel;
    
    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel clientChannel = ctx.channel();
    
        Bootstrap b = new Bootstrap();
        b.group(clientChannel.eventLoop())
         .channel(ctx.channel().getClass())
         .handler(new HexDumpProxyBackendHandler(clientChannel))
         .option(ChannelOption.AUTO_READ, false);
    
        ChannelFuture f = b.connect(remoteHost, remotePort);
        serverChannel = f.channel();
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    clientChannel.read();
                }
                else
                    clientChannel.close();
                }
    
            });
    }
    
    @Override
    public void channelRead (final ChannelHandlerContext ctx, Object msg) {
        if (serverChannel.isActive()) {
            System.out.println("************" + msg);
            serverChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        ctx.channel().read();
                        System.out.println("Read from server channel. - channelRead");
                    } else {
                        future.channel().close();
                        System.out.println("Close server channel.");
                    }
                }
            });
        }
    }
    
    @Override
    public void channelInactive (ChannelHandlerContext ctx) {
        if (serverChannel != null) {
            closeOnFlush(serverChannel);
            System.out.println("close on flush - server channel");
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
    
    static void closeOnFlush (Channel ch) {
        if (ch.isActive())
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        ctx.channel().read();
        }
    }
    

    后端处理程序类:

    public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {
    
    private final Channel clientChannel;
    
    public HexDumpProxyBackendHandler (Channel clientChannel) {
        this.clientChannel = clientChannel;
    }
    
    @Override
    public void channelActive (ChannelHandlerContext ctx) {
        ctx.channel().read();
        System.out.println("Read from client channel. - channelActive");
    }
    
    @Override
    public void channelRead (final ChannelHandlerContext ctx, Object msg) {
        System.out.println("~~~~~~~~~~~" + msg);
        clientChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    clientChannel.read();
                    System.out.println("Read from client channel. - channelRead");
                } else {
                    clientChannel.close();
                    System.out.println("Close client channel.");
                    }
                }
            });
    }
    
    @Override
    public void channelInactive (ChannelHandlerContext ctx) {
        HexDumpProxyFrontendHandler.closeOnFlush(clientChannel);
        System.out.println("close on flush - client channel");
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        ctx.channel().read();
        }
    }
    
    1 回复  |  直到 7 年前
        1
  •  0
  •   goshi    7 年前

    课堂上 HexDumpProxyFrontendHandler 而不是使用新的 HexDumpProxyBackendHandler(clientChannel) 作为引导过程中的处理程序,创建一个新类,例如。 HexDumpProxyBackendInitializer(clientChannel) 在类中,以与在类中相同的方式初始化管道 HexDumpProxyInitializer .