代码之家  ›  专栏  ›  技术社区  ›  Abhijit Sarkar

如何解压缩通量(以及如何编写通量)?

  •  8
  • Abhijit Sarkar  · 技术社区  · 7 年前

    我需要在没有中间存储的情况下读写压缩(GZIP)流。目前,我正在使用Spring RestTemplate 编写,Apache HTTP客户端读取(参见我的答案 here 解释原因 RestTemplate 无法用于读取大数据流)。实现相当简单,其中 GZIPInputStream 关于回应 InputStream 继续前进。

    现在,我想切换到使用Spring 5 WebClient (只是因为我不喜欢现状)。然而 WebClient 本质上是反应性的 Flux<Stuff> ; 我相信有可能 Flux<DataBuffer> 哪里 DataBuffer 抽象是否结束 ByteBuffer . 问题是,我如何动态解压缩它,而不必将整个流存储在内存中( OutOfMemoryError ,我在看你),还是在写本地磁盘?值得一提的是 网络客户端 在引擎盖下使用Netty。

    我承认我对(反)压缩知之甚少,然而,我做了研究,但网上提供的任何资料似乎都没有特别的帮助。

    compression on java nio direct buffers

    Writing GZIP file with nio

    Reading a GZIP file from a FileChannel (Java NIO)

    (de)compressing files using NIO

    Iterable gzip deflate/inflate in Java

    2 回复  |  直到 7 年前
        1
  •  4
  •   Abhijit Sarkar    7 年前
    public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
        private final HttpHeaders httpHeaders;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof HttpResponse &&
                    !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
                HttpHeaders headers = ((HttpResponse) msg).headers();
    
                httpHeaders.forEach(e -> {
                    log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
                    headers.set(e.getKey(), e.getValue());
                });
            }
            ctx.fireChannelRead(msg);
        }
    }
    

    然后我创建一个 ClientHttpConnector 用于 WebClient 并且在 afterNettyContextInit 添加处理程序:

    ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
    ctx.addHandlerLast(new Slf4JLoggingHandler());
    if (forceDecompression) {
        io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
                true,
                CONTENT_ENCODING, GZIP,
                CONTENT_TYPE, APPLICATION_JSON
        );
        HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
        ctx.addHandlerFirst(headersModifier);
    }
    ctx.addHandlerLast(new HttpContentDecompressor());
    

    当然,对于没有经过GZIP压缩的响应,这将失败,所以我使用 网络客户端 仅针对特定用例,我确信响应是压缩的。

    写作很容易:Spring有一个 ResourceEncoder 所以 InputStream 可以简单地转换为 InputStreamResource ,瞧!

        2
  •  1
  •   Michael Berry    5 年前

    注意到这一点,因为它让我有点困惑——API从5.1开始有了一些变化。

    我的设置与 ChannelInboundHandler :

    public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof HttpResponse
                    && !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
                HttpHeaders headers = ((HttpResponse) msg).headers();
                headers.clear();
                headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
                headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
            }
            ctx.fireChannelRead(msg);
        }
    }
    

    (为了简单起见,我需要的标题值只是硬编码的,否则就完全相同了。)

    然而,注册它是不同的:

    WebClient.builder()
        .clientConnector(
                new ReactorClientHttpConnector(
                        HttpClient.from(
                                TcpClient.create()
                                        .doOnConnected(c -> {
                                            c.addHandlerFirst(new HttpContentDecompressor());
                                            c.addHandlerFirst(new HttpResponseHeadersHandler());
                                        })
                        ).compress(true)
                )
        )
        .build();
    

    Netty现在似乎在系统列表之外(以及之后)维护一个处理程序的用户列表 addHandlerFirst() 仅将处理程序放在用户列表的前面。因此,它需要显式调用 HttpContentDecompressor 以确保在处理程序插入正确的头之后执行它。