代码之家  ›  专栏  ›  技术社区  ›  KDecker

异步检查值而不使线程陷入停滞?

  •  1
  • KDecker  · 技术社区  · 8 年前

    我的应用程序中有一个模式,我想发出一些命令或启动一些IO工作并等待它完成,或者有一些机制知道它是否完成。

    为此,我计划使用 async / await 图案我想大多数人在编码时都已经达到了这种模式。

    while(something)
        DoNothing();
    

    哪里 DoNothing() 通常会消耗一些CPU时间或完全停止程序。我想解决这个问题的正确方法是 异步 / 等候 图案

    在我的例子中,我想出了以下简单的方法

    public override async Task<Boolean> PerformProcessingAsync()
    {
        StartSomeIOProcessing();
        while (TheIOProcessingResult == null)
            await Task.Yield();
        return true;
    }
    

    它开始一些IO处理,然后等待结果被实例化。与此同时,尽管它在呼唤 Task.Yield 返回到调用上下文,然后可以继续工作,并将此方法的延续(下一个while循环迭代)放到调用堆栈上。

    这是正确的解释吗?这是解决上述情况的正确方法吗?

    编辑: 在我面临的更具体的情况下…我还维护另一个执行IO工作的库,基本上是读写 SerialPort 物体。这个图书馆的工作原理是 ConcurrentQueue 它针对特定端口处理的读取或写入 Task “坐在队列的末尾”,并在工作进行时消耗工作。

    大多数阅读工作只需查询某个值,解析它,然后触发一个事件 NewData(double myNewData) 其由UI收听。在UI中 串行端口控制 s数据。在状态表示中 NewData 事件被处理,该事件更新其对应的任何值。

    写入是以相同的方式完成的,但没有在完成时触发事件,只需将端口写入。判断是否成功的唯一方法是等待读取更新状态。(遗憾的是,由于硬件的工作方式,没有更好的方法可以做到这一点)。


    我当前的应用程序使用这个库来执行IO工作。读取数据会定期发送到库,以使UI保持端口中的新值……当用户单击按钮时,写入数据会发送到库以从硬件中命令某些内容。

    我现在的处境是,我想确保这封信已经发生 以编程方式 。我能想到的唯一方法是将注销发送到库,并等待读取来更新数据和写入效果。

    因此,循环。

    3 回复  |  直到 8 年前
        1
  •  1
  •   Community holdenweb    3 年前

    我现在的情况是,我想确保写入是以编程方式进行的。我能想到的唯一方法是将注销发送到库,并等待读取来更新写入效果的数据。

    是的,这可能是唯一的方法,但有比旋转等待读取更新是否发生更好的方法。

    下面是一个处理过程的示例方法,就像我们在 the chat 我们做到了。总之,您有一个队列 IDataRequest ,在这些请求中,他们持有 TaskCompletionSource<T> 表示数据发送完成。一旦您向设备发出请求并得到响应,就可以设置完成源的结果。我将它与您现有的基于事件的实现结合起来,但老实说,我会删除事件的,只是让调用者在等待结果后更新UI RequestTemp() .

    public interface IDataRequest
    {
        bool TrySetException(Exception ex);
    }
    
    public abstract class DataRequest<T> : IDataRequest
    {
        public TaskCompletionSource<T> RequestTask { get; } = new TaskCompletionSource<T>();
    
        public bool TrySetException(Exception ex)
        {
            return RequestTask.TrySetException(ex);
        }
    }
    
    public class TempRequest : DataRequest<double>
    {
    }
    
    public class RpmRequest : DataRequest<int>
    {
    }
    
    public sealed class DeviceManager : IDisposable
    {
    
        private readonly Task _workerThread;
        private readonly BlockingCollection<IDataRequest> _queue;
        private readonly SerialPort _serialPort;
    
        public DeviceManager()
        {
            _queue = new BlockingCollection<IDataRequest>();
            _workerThread = Task.Factory.StartNew(ProcessQueue, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            _serialPort = //...
        }
        
        public event EventHandler<double> TempUpdate;
        public event EventHandler<int> RpmUpdate;
    
        public Task<double> RequestTemp()
        {
            var request = new TempRequest();
            _queue.Add(request);
            return request.RequestTask.Task;
        }
    
        public Task<int> RequestRpm()
        {
            var request = new RpmRequest();
            _queue.Add(request);
            return request.RequestTask.Task;
        }
        public void Dispose()
        {
            _queue.CompleteAdding();
            _workerThread.Wait();
        }
    
        private void ProcessQueue()
        {
            foreach (var dataRequest in _queue.GetConsumingEnumerable())
            {
                try
                {
                    if (dataRequest is TempRequest)
                    {
                        DoTempRequest((TempRequest)dataRequest);
                    }
                    else if (dataRequest is RpmRequest)
                    {
                        DoRpmRequest((RpmRequest)dataRequest);
                    }
                    else
                    {
                        throw new NotSupportedException($"A Request of type {dataRequest.GetType()} is not supported.");
                    }
                }
                catch (Exception ex)
                {
                    dataRequest.TrySetException(ex);
                }
            }
        }
    
        private void DoTempRequest(TempRequest dataRequest)
        {
            _serialPort.WriteLine("Temp ?");
            var line = _serialPort.ReadLine();
            double result;
    
            //I am deliberately using Parse instead of TryParse so responses that 
            //fail to parse will throw and get their exception propagated up via the 
            //catch in ProcessQueue().
            result = double.Parse(line);
            
            //Sends the info back to the caller saying it is done and what the result was.
            dataRequest.RequestTask.TrySetResult(result);
    
            //Raises the event so subscribers know the new value.
            OnTempUpdate(result);
        }
    
        private void DoRpmRequest(RpmRequest dataRequest)
        {
            _serialPort.WriteLine("RPM ?");
            var line = _serialPort.ReadLine();
            int result;
            result = int.Parse(line);
            
            dataRequest.RequestTask.TrySetResult(result);
            OnRpmUpdate(result);
            
        }
    
        private void OnTempUpdate(double result)
        {
            TempUpdate?.Invoke(this, result);
        }
    
        private void OnRpmUpdate(int result)
        {
            RpmUpdate?.Invoke(this, result);
        }
    }
    
        2
  •  0
  •   d.moncada    8 年前

    我会提取 StartSomethingIOProcessing 成为一个独立的 Task 返回结果。 Await 然后检查结果。

    public async PerformProcessingAsync()
    {
        var ioProcessingResult = await StartSomeIOProcessing();
        return (null != ioProcessingResult);
    }
    
    private Task<TheIOProcessingResultType> StartSomeIOProcessing()
    {
        return Task.Run(()=>
        {
            return StartSomeIOProcessing();
        });
    }
    
        3
  •  -1
  •   Jay    8 年前

    我相信您误解了异步/等待操作。await关键字实际上用于等待异步操作完成,因此您不再需要实现while循环并检查它是否为空。

    public override async Task<Boolean> PerformProcessingAsync()
    {
        await Task.Run((() =>
        {
                StartSomeIOProcessing();
        });
        return true;
    }