代码之家  ›  专栏  ›  技术社区  ›  madhead

Netty请求超时

  •  1
  • madhead  · 技术社区  · 6 年前

    我正在尝试编写一个HTTP服务,它将从HTTP获取数据,并使用Netty将其放入Kafka中。我需要在一个m5.large的EC2实例上处理20K的RPS,这看起来非常可行。

    代码很简单:

    Server.java

    public class Server {
        public static void main(final String[] args) throws Exception {
            final EventLoopGroup bossGroup = new EpollEventLoopGroup();
            final EventLoopGroup workerGroup = new EpollEventLoopGroup();
    
            try {
                final ServerBootstrap bootstrap = new ServerBootstrap();
    
                bootstrap
                    .group(bossGroup, workerGroup)
                    .channel(EpollServerSocketChannel.class)
                    .childHandler(new RequestChannelInitializer(createProducer()))
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.bind(PORT).sync().channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private static Producer<String, ByteBuffer> createProducer() {
            final Properties properties = new Properties();
    
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
            properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaBidRequestProducer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class.getName());
            properties.put(ProducerConfig.RETRIES_CONFIG, 0);
            properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
            properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
            properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 33554432);
    
            return new KafkaProducer<>(properties);
        }
    }
    

    RequestChannelInitializer.java

    public class RequestChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
        private final Producer<String, ByteBuffer> producer;
    
        public BidRequestChannelInitializer(final Producer<String, ByteBuffer> producer) {
            this.producer = producer;
        }
    
        @Override
        public void initChannel(final SocketChannel ch) {
            ch.pipeline().addLast(new HttpServerCodec());
            ch.pipeline().addLast(new HttpObjectAggregator(1048576));
            ch.pipeline().addLast(new RequestHandler(producer));
        }
    }
    

    RequestHandler.java

    public class RequestHandler extends SimpleChannelInboundHandler<FullHttpMessage> {
        private final Producer<String, ByteBuffer> producer;
    
        public BidRequestHandler(final Producer<String, ByteBuffer> producer) {
            this.producer = producer;
        }
    
        @Override
        public void channelReadComplete(final ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) {
            final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
            final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(
                "test",
                UUID.randomUUID().toString(),
                msg.content().nioBuffer()
            );
    
            producer.send(record);
    
            if (HttpUtil.isKeepAlive(msg)) {
                response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
    
            ctx.write(response).addListener(ChannelFutureListener.CLOSE);
        }
    }
    

    代码取自官方文档。但是,有时我会 Request 'Post BidRequest' failed: j.u.c.TimeoutException: Request timeout after 60000 ms 负载测试中出现异常。

    据我所知,这意味着在我的负载测试实例和服务实例之间建立了连接,但完成连接需要60秒。这个简单程序的哪个部分可以阻塞这么长时间?

    我已经调整了卡夫卡制片人:减少了超时时间。我知道 send 可能是阻塞,所以我增加了发送缓冲区,但没有帮助。 我也增加了 ulimits 对于服务用户。 我在OpenJDK 1.8.0u171上运行 securerandom.source 设置为 file:/dev/urandom ,所以 randomUUID 不应该阻塞。

    1 回复  |  直到 6 年前
        1
  •  1
  •   mjuarez    6 年前

    你说得对,里面没有什么可以阻挡的。发送给卡夫卡的调用是异步的。我看过你的代码,从我所看到的来看一切都很好。

    我要检查几件事:

    • 确保AWS中的安全组定义允许Kafka服务器和应用程序与Zookeeper对话。如果这是一个test/POC,那么您应该只允许所有三个实例/集群之间的所有流量。60秒的超时使我怀疑网络超时,这可能意味着某些服务无法访问。
    • 你有没有试过一个更简单的测试,试着在没有网络依赖的情况下制作卡夫卡?也许这有助于缩小问题的范围。