代码之家  ›  专栏  ›  技术社区  ›  XpiritO

.NET异步流读/写

  •  45
  • XpiritO  · 技术社区  · 15 年前

    我一直在尝试解决这个“并发编程”考试练习(在C中):

    知道那 Stream 类包含 int Read(byte[] buffer, int offset, int size) void Write(byte[] buffer, int offset, int size) 方法,在C中实现 NetToFile 复制从中接收的所有数据的方法 NetworkStream net 实例到 FileStream file 实例。要进行传输,请使用异步读取和同步写入,避免在读取操作期间阻塞一个线程。当 net 读取操作返回值0。为了简化,不需要支持操作的受控取消。

    void NetToFile(NetworkStream net, FileStream file);
    

    我一直在努力解决这个问题,但我在努力解决一个与问题本身相关的问题。但首先,我的代码是:

    public static void NetToFile(NetworkStream net, FileStream file) {
        byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
        int offset = 0; // read/write offset
        int nBytesRead = 0; // number of bytes read on each cycle
    
        IAsyncResult ar;
        do {
            // read partial content of net (asynchronously)
            ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
            // wait until read is completed
            ar.AsyncWaitHandle.WaitOne();
            // get number of bytes read on each cycle
            nBytesRead = net.EndRead(ar);
    
            // write partial content to file (synchronously)
            fs.Write(buffer,offset,nBytesRead);
            // update offset
            offset += nBytesRead;
        }
        while( nBytesRead > 0);
    }
    

    我的问题是,在问题陈述中说:

    要进行传输,请使用异步 读取和同步写入,避免 一个线程在读取过程中被阻塞 操作

    我不确定我的解决方案是否能完成这个练习中需要的,因为我正在使用 AsyncWaitHandle.WaitOne() 等待异步读取完成。

    另一方面,我并没有真正弄清楚在这个场景中什么是“非阻塞”解决方案,因为 FileStream 写是要同步进行的…要做到这一点,我必须等到 NetworkStream 读取完成以继续 文件流 写作,不是吗?

    你能帮我一下吗?


    [编辑1 ] 使用 回调 解决方案

    好吧,如果我明白了 Mitchel Sellers willvv 回答说,我被建议使用回调方法将其转换为“非阻塞”解决方案。这是我的代码,然后:

    byte[] buffer; // buffer
    
    public static void NetToFile(NetworkStream net, FileStream file) {
        // buffer with same dimension as file stream data
        buffer = new byte[file.Length];
        //start asynchronous read
        net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
    }
    
    //asynchronous callback
    static void OnEndRead(IAsyncResult ar) {
        //NetworkStream retrieve
        NetworkStream net = (NetworkStream) ar.IAsyncState;
        //get number of bytes read
        int nBytesRead = net.EndRead(ar);
    
        //write content to file
        //... and now, how do I write to FileStream instance without
        //having its reference??
        //fs.Write(buffer,0,nBytesRead);
    }
    

    正如您可能注意到的,我一直坚持使用回调方法,因为我没有对 文件流 我要调用“write(…)”方法的实例。

    此外,这不是线程安全解决方案,因为 byte[] 字段是公开的,可以在并发的 网络托福 调用。我不知道如何解决这个问题而不暴露这个 字节[] 外部范围中的字段…我几乎可以肯定它不会这样暴露。

    我不想使用lambda或匿名方法解决方案,因为这不在“并发编程”课程中。

    6 回复  |  直到 7 年前
        1
  •  12
  •   bendewey    15 年前

    您需要使用netstream read中的回调来处理这个问题。坦率地说,将复制逻辑包装到它自己的类中可能会更容易,这样您就可以维护活动流的实例。

    这就是我如何处理它(未测试):

    public class Assignment1
    {
        public static void NetToFile(NetworkStream net, FileStream file) 
        {
            var copier = new AsyncStreamCopier(net, file);
            copier.Start();
        }
    
        public static void NetToFile_Option2(NetworkStream net, FileStream file) 
        {
            var completedEvent = new ManualResetEvent(false);
    
            // copy as usual but listen for completion
            var copier = new AsyncStreamCopier(net, file);
            copier.Completed += (s, e) => completedEvent.Set();
            copier.Start();
    
            completedEvent.WaitOne();
        }
    
        /// <summary>
        /// The Async Copier class reads the input Stream Async and writes Synchronously
        /// </summary>
        public class AsyncStreamCopier
        {
            public event EventHandler Completed;
    
            private readonly Stream input;
            private readonly Stream output;
    
            private byte[] buffer = new byte[4096];
    
            public AsyncStreamCopier(Stream input, Stream output)
            {
                this.input = input;
                this.output = output;
            }
    
            public void Start()
            {
                GetNextChunk();
            }
    
            private void GetNextChunk()
            {
                input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
            }
    
            private void InputReadComplete(IAsyncResult ar)
            {
                // input read asynchronously completed
                int bytesRead = input.EndRead(ar);
    
                if (bytesRead == 0)
                {
                    RaiseCompleted();
                    return;
                }
    
                // write synchronously
                output.Write(buffer, 0, bytesRead);
    
                // get next
                GetNextChunk();
            }
    
            private void RaiseCompleted()
            {
                if (Completed != null)
                {
                    Completed(this, EventArgs.Empty);
                }
            }
        }
    }
    
        2
  •  52
  •   Nicholas Carey    8 年前

    尽管帮助人们做家庭作业是有悖常理的,但考虑到这已经有一年多了,以下是实现这一目标的正确方法。你所需要的一切 重叠 您的读/写操作-不需要生成其他线程或任何其他操作。

    public static class StreamExtensions
    {
        private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
        public static void CopyTo( this Stream input , Stream output )
        {
            input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
            return ;
        }
        public static void CopyTo( this Stream input , Stream output , int bufferSize )
        {
            if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
            if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
    
            byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
            int[]        bufl  = { 0 , 0 }                                       ;
            int          bufno = 0 ;
            IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
            IAsyncResult write = null ;
    
            while ( true )
            {
    
                // wait for the read operation to complete
                read.AsyncWaitHandle.WaitOne() ; 
                bufl[bufno] = input.EndRead(read) ;
    
                // if zero bytes read, the copy is complete
                if ( bufl[bufno] == 0 )
                {
                    break ;
                }
    
                // wait for the in-flight write operation, if one exists, to complete
                // the only time one won't exist is after the very first read operation completes
                if ( write != null )
                {
                    write.AsyncWaitHandle.WaitOne() ;
                    output.EndWrite(write) ;
                }
    
                // start the new write operation
                write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;
    
                // toggle the current, in-use buffer
                // and start the read operation on the new buffer.
                //
                // Changed to use XOR to toggle between 0 and 1.
                // A little speedier than using a ternary expression.
                bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
                read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
    
            }
    
            // wait for the final in-flight write operation, if one exists, to complete
            // the only time one won't exist is if the input stream is empty.
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }
    
            output.Flush() ;
    
            // return to the caller ;
            return ;
        }
    
    
        public static async Task CopyToAsync( this Stream input , Stream output )
        {
            await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
            return;
        }
    
        public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
        {
            if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
            if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
    
            byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
            int[]        bufl  = { 0 , 0 } ;
            int          bufno = 0 ;
            Task<int>    read  = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
            Task         write = null ;
    
            while ( true )
            {
    
                await read ;
                bufl[bufno] = read.Result ;
    
                // if zero bytes read, the copy is complete
                if ( bufl[bufno] == 0 )
                {
                    break;
                }
    
                // wait for the in-flight write operation, if one exists, to complete
                // the only time one won't exist is after the very first read operation completes
                if ( write != null )
                {
                    await write ;
                }
    
                // start the new write operation
                write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;
    
                // toggle the current, in-use buffer
                // and start the read operation on the new buffer.
                //
                // Changed to use XOR to toggle between 0 and 1.
                // A little speedier than using a ternary expression.
                bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
                read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );
    
            }
    
            // wait for the final in-flight write operation, if one exists, to complete
            // the only time one won't exist is if the input stream is empty.
            if ( write != null )
            {
                await write;
            }
    
            output.Flush();
    
            // return to the caller ;
            return;
        }
    
    }
    

    干杯。

        3
  •  16
  •   John Leidegren    9 年前

    我怀疑这是最快的代码(有一些来自.NET任务抽象的开销),但我确实认为这是 清洁剂 整个异步复制的方法。

    我需要一个 CopyTransformAsync 在这里,我可以传递一个委托来做一些事情,因为块是通过复制操作传递的。例如,复制时计算消息摘要。这就是为什么我对滚动我自己的选择感兴趣。

    调查结果:

    • CopyToAsync缓冲区大小敏感(需要大缓冲区)
    • fileoptions.asynchronous->使其速度异常缓慢(不确定原因)
    • 文件流对象的缓冲区大小可以更小(这并不重要)
    • 这个 Serial 测试显然是最快和最密集的资源。

    这是我找到的,还有 the complete source code 对于我用来测试这个的程序。在我的机器上,这些测试是在一个SSD磁盘上运行的,相当于一个文件副本。通常情况下,您不希望将其用于复制文件,而是当您有一个网络流(这就是我的用例)时,这就是您想要使用类似这样的东西的时候。

    4K buffer
    
    Serial...                                in 0.474s
    CopyToAsync...                           timed out
    CopyToAsync (Asynchronous)...            timed out
    CopyTransformAsync...                    timed out
    CopyTransformAsync (Asynchronous)...     timed out
    
    8K buffer
    
    Serial...                                in 0.344s
    CopyToAsync...                           timed out
    CopyToAsync (Asynchronous)...            timed out
    CopyTransformAsync...                    in 1.116s
    CopyTransformAsync (Asynchronous)...     timed out
    
    40K buffer
    
    Serial...                                in 0.195s
    CopyToAsync...                           in 0.624s
    CopyToAsync (Asynchronous)...            timed out
    CopyTransformAsync...                    in 0.378s
    CopyTransformAsync (Asynchronous)...     timed out
    
    80K buffer
    
    Serial...                                in 0.190s
    CopyToAsync...                           in 0.355s
    CopyToAsync (Asynchronous)...            in 1.196s
    CopyTransformAsync...                    in 0.300s
    CopyTransformAsync (Asynchronous)...     in 0.886s
    
    160K buffer
    
    Serial...                                in 0.432s
    CopyToAsync...                           in 0.252s
    CopyToAsync (Asynchronous)...            in 0.454s
    CopyTransformAsync...                    in 0.447s
    CopyTransformAsync (Asynchronous)...     in 0.555s
    

    在这里,您可以看到运行测试时的流程资源管理器性能图。基本上每个 顶部 (在三个图的下一个)是串行测试的开始。您可以清楚地看到,随着缓冲区大小的增长,吞吐量是如何大幅增加的。它看起来好像计划在80K左右,这就是.NET框架 CopyToAsync 方法在内部使用。

    Performance Graph

    这里的好处是最终实现没有那么复杂:

    static Task CompletedTask = ((Task)Task.FromResult(0));
    static async Task CopyTransformAsync(Stream inputStream
        , Stream outputStream
        , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
        )
    {
        var temp = new byte[bufferSize];
        var temp2 = new byte[bufferSize];
    
        int i = 0;
    
        var readTask = inputStream
            .ReadAsync(temp, 0, bufferSize)
            .ConfigureAwait(false);
    
        var writeTask = CompletedTask.ConfigureAwait(false);
    
        for (; ; )
        {
            // synchronize read
            int read = await readTask;
            if (read == 0)
            {
                break;
            }
    
            if (i++ > 0)
            {
                // synchronize write
                await writeTask;
            }
    
            var chunk = new ArraySegment<byte>(temp, 0, read);
    
            // do transform (if any)
            if (!(transform == null))
            {
                chunk = transform(chunk);
            }
    
            // queue write
            writeTask = outputStream
                .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
                .ConfigureAwait(false);
    
            // queue read
            readTask = inputStream
                .ReadAsync(temp2, 0, bufferSize)
                .ConfigureAwait(false);
    
            // swap buffer
            var temp3 = temp;
            temp = temp2;
            temp2 = temp3;
        }
    
        await writeTask; // complete any lingering write task
    }
    

    尽管存在巨大的缓冲区,这种交错读/写的方法比BCL快18%。 复制到同步 .

    出于好奇,我确实将异步调用更改为典型的开始/结束异步模式调用,这一点并没有改善情况,反而使情况更糟。尽管我喜欢抨击任务抽象开销,但当您使用async/await关键字编写代码时,它们会做一些漂亮的事情,而且阅读代码会更好!

        4
  •  11
  •   Kenzi    14 年前

    哇,这些都很复杂!这是我的异步解决方案,它只是一个函数。read()和beginwrite()同时运行。

    /// <summary>
    /// Copies a stream.
    /// </summary>
    /// <param name="source">The stream containing the source data.</param>
    /// <param name="target">The stream that will receive the source data.</param>
    /// <remarks>
    /// This function copies until no more can be read from the stream
    ///  and does not close the stream when done.<br/>
    /// Read and write are performed simultaneously to improve throughput.<br/>
    /// If no data can be read for 60 seconds, the copy will time-out.
    /// </remarks>
    public static void CopyStream(Stream source, Stream target)
    {
        // This stream copy supports a source-read happening at the same time
        // as target-write.  A simpler implementation would be to use just
        // Write() instead of BeginWrite(), at the cost of speed.
    
        byte[] readbuffer = new byte[4096];
        byte[] writebuffer = new byte[4096];
        IAsyncResult asyncResult = null;
    
        for (; ; )
        {
            // Read data into the readbuffer.  The previous call to BeginWrite, if any,
            //  is executing in the background..
            int read = source.Read(readbuffer, 0, readbuffer.Length);
    
            // Ok, we have read some data and we're ready to write it, so wait here
            //  to make sure that the previous write is done before we write again.
            if (asyncResult != null)
            {
                // This should work down to ~0.01kb/sec
                asyncResult.AsyncWaitHandle.WaitOne(60000);
                target.EndWrite(asyncResult); // Last step to the 'write'.
                if (!asyncResult.IsCompleted) // Make sure the write really completed.
                    throw new IOException("Stream write failed.");
            }
    
            if (read <= 0)
                return; // source stream says we're done - nothing else to read.
    
            // Swap the read and write buffers so we can write what we read, and we can
            //  use the then use the other buffer for our next read.
            byte[] tbuf = writebuffer;
            writebuffer = readbuffer;
            readbuffer = tbuf;
    
            // Asynchronously write the data, asyncResult.AsyncWaitHandle will
            // be set when done.
            asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
        }
    }
    
        5
  •  9
  •   Shrike    13 年前

    奇怪的是没有人提到过TPL。
    Here 关于如何实现并发异步流复制,这是pfx团队(stephen toub)的一篇非常好的文章。这篇文章包含了样本的过期参考文献,所以下面是相关的一篇:
    得到 Parallel Extensions Extras from code.msdn 然后

    var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
    // do what you want with the task, for example wait when it finishes:
    task.Wait();
    

    还可以考虑使用J.Richer's AsyncEnumerator .

        6
  •  0
  •   willvv    15 年前

    你是对的,你所做的基本上是同步读取,因为你使用waitone()方法,它只是停止执行直到数据准备好,这基本上与使用read()而不是beginread()和endread()相同。

    您需要做的是,在beginread()方法中使用回调参数,然后定义回调方法(或lambda表达式),在读取信息时将调用此方法(在回调方法中,您必须检查流的结尾并写入输出流),这样就不会阻塞主THREAD(您不需要waitone()或endread()。

    希望这有帮助。