代码之家  ›  专栏  ›  技术社区  ›  molf

如何在转发流的同时从流中复制数据

  •  4
  • molf  · 技术社区  · 6 年前

    我在用 hyper 0.12以生成代理服务。当从上游服务器接收到响应体时,我希望尽快将其转发回客户端, 将内容保存在缓冲区中以备以后处理。

    所以我需要一个函数:

    • 拿一个 Stream (一) hyper::Body ,准确地说)
    • 返回 溪流 在功能上与输入流相同的
    • 返回某种 Future<Item = Vec<u8>, Error = ...> 当输出流完全消耗时,用输入流的缓冲内容解析

    我一辈子都不知道怎么做。

    我想我要找的功能是这样的:

    type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
    pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
        let body2 = ... // ???
        let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
            buf.extend_from_slice(&chunk);
            // ...somehow send this chunk to body2 also?
        });
        (body2, buffer);
    }
    

    下面是我尝试过的,直到 send_data() 失败(显然)。

    type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
    pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
        let (mut sender, body2) = hyper::Body::channel();
        let consume =
            body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
                buf.extend_from_slice(&chunk);
    
                // What to do if this fails?
                if sender.send_data(chunk).is_err() {}
                Box::new(future::ok(buf))
            });
    
        (body2, Box::new(consume));
    }
    

    然而,有些事告诉我我走错了方向。

    我发现了 Sink.fanout() 这似乎是我想要的,但我没有 Sink ,我不知道如何构造一个。 超级::正文 工具 溪流 但不是 水槽 .

    1 回复  |  直到 6 年前
        1
  •  3
  •   molf    6 年前

    我最终做的是实现一种新类型的流,它可以满足我的需要。这似乎是必要的,因为 hyper::Body 不执行 Sink 也不是 hyper::Chunk 实施 Clone (这是 Sink.fanout() 因此,我不能使用任何现有的组合器。

    首先是一个结构,它包含我们需要的所有详细信息和附加新块的方法,并通知缓冲区已完成。

    struct BodyClone<T> {
        body: T,
        buffer: Option<Vec<u8>>,
        sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
    }
    
    impl BodyClone<hyper::Body> {
        fn flush(&mut self) {
            if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
                if sender.send(buffer).is_err() {}
            }
        }
    
        fn push(&mut self, chunk: &hyper::Chunk) {
            use hyper::body::Payload;
    
            let length = if let Some(buffer) = self.buffer.as_mut() {
                buffer.extend_from_slice(chunk);
                buffer.len() as u64
            } else {
                0
            };
    
            if let Some(content_length) = self.body.content_length() {
                if length >= content_length {
                    self.flush();
                }
            }
        }
    }
    

    然后我实现了 Stream 这个结构的特征。

    impl Stream for BodyClone<hyper::Body> {
        type Item = hyper::Chunk;
        type Error = hyper::Error;
    
        fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
            match self.body.poll() {
                Ok(Async::Ready(Some(chunk))) => {
                    self.push(&chunk);
                    Ok(Async::Ready(Some(chunk)))
                }
                Ok(Async::Ready(None)) => {
                    self.flush();
                    Ok(Async::Ready(None))
                }
                other => other,
            }
        }
    }
    

    最后,我可以在 超级::正文 :

    pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;
    
    trait CloneBody {
        fn clone_body(self) -> (hyper::Body, BufferFuture);
    }
    
    impl CloneBody for hyper::Body {
        fn clone_body(self) -> (hyper::Body, BufferFuture) {
            let (sender, receiver) = futures::sync::oneshot::channel();
    
            let cloning_stream = BodyClone {
                body: self,
                buffer: Some(Vec::new()),
                sender: Some(sender),
            };
    
            (
                hyper::Body::wrap_stream(cloning_stream),
                Box::new(receiver.map_err(|_| ())),
            )
        }
    }
    

    这可以如下使用:

    let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();