代码之家  ›  专栏  ›  技术社区  ›  BFree

反应式框架(RX)和异步处理事件

  •  3
  • BFree  · 技术社区  · 14 年前

    所以我只是在玩RX并学习它。我开始玩事件,想知道如何订阅事件,并异步批量处理结果。请允许我用代码解释:

    引发事件的简单类:

    public class EventRaisingClass
    {
       public event EventHandler<SomeEventArgs> EventOccured;
    
       //some other code that raises event...
    }
    
    public class SomeEventArgs : EventArgs
    {
        public SomeEventArgs(int data)
        {
            this.SomeArg = data;
        }
    
        public int SomeArg { get; private set; }
    }
    

    然后我的主人:

    public static void Main(string[] args)
    {
        var eventRaiser = new EventRaisingClass();
        IObservable<IEvent<SomeEventArgs>> observable = 
            Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);
    
        IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);
    
        //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
        bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...
    
    }
    

    正如您在我的注释中看到的,当您像那样调用subscribe时,所有的订阅代码都会同步发生。当有新的一批事件要处理时,有没有办法使用RX在不同的线程上调用Subscribe?

    2 回复  |  直到 14 年前
        1
  •  2
  •   Sergey Aldoukhov    14 年前
    bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...
    

    SubscribeOn是指定所谓的 订阅副作用 “正在发生。例如,你的observable可以在每次有人订阅时打开一个文件。

    ObserveOn是指定每次有新值时调用观察者的时间表。在实践中,它比订阅更常用。

        2
  •  2
  •   Stephen Cleary    14 年前

    我相信你在找 SubscribeOn ObserveOn ,通过 IScheduler . 下面有几个内置的调度程序 System.Concurrency ;其中一些使用当前的线程,而另一些使用特定的线程。

    This video 有更多关于调度程序概念的信息。

    Rx团队最近还发布了 hands-on labs 最接近教程的文档。