代码之家  ›  专栏  ›  技术社区  ›  Umair

IObservable收集当前流中的所有元素并执行批量操作

  •  2
  • Umair  · 技术社区  · 7 年前

    示例代码

    async Task Main()
    {
        var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();
    
        entities = Square(entities);
        entities = Print(entities);
        entities = Add(entities, 1);
        entities = await SaveObservable(entities);
        entities = Add(entities, 2);
    
        await PrintAll(entities);
    }
    
    public IObservable<double> Square(IObservable<int> source)
        => source.Select(i => Math.Pow(i, 2.0));
    
    public IObservable<double> Add(IObservable<double> source, double add)
        => source.Select(i => i + add);
    
    public IObservable<T> Print<T>(IObservable<T> source)
        => source.Do(i => Console.WriteLine("Print: {0}", i));
    
    public async Task<IObservable<T>> SaveObservable<T>(IObservable<T> source)
    {
        var last = await source.LastOrDefaultAsync();
        Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
    
        // await database.SaveChangesAsync();
    
        return source;
    }
    
    public async Task PrintAll(IObservable<double> source)
        => await source.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));
    

    想象一下,在我的 SaveObservable 函数,我想在数据库上执行一个操作,例如批量保存添加到数据库上下文中的所有实体,这样我就可以让数据库引擎(例如SQL)填充任何数据库生成的ID。

    所以我想要这个函数做的是收集传入的可观察流,调用数据库。SaveChangesAsync,然后基本上通过返回传入的流来继续。

    但是,如果我运行上面的代码,则 Print 再次调用函数。我不希望以前调用的可观察对象的任何投影/映射再次运行。

    我将如何实现这一点?有可能吗?

    2 回复  |  直到 7 年前
        1
  •  3
  •   Shlomo    7 年前

    首先,你的评论很中肯:这可能最好用简单的、基于拉的编程来完成。

    至于你的代码,你有两个问题:

    1) 可观察对象类似于项目管道。您的代码基本上设置了两个管道:

    entities -> Square -> Print -> Add(1) -> SaveObservable
    entities -> Square -> Print -> Add(1) -> Add(2) -> PrintAll
    

    这就是为什么你会看到 Print 打了两次电话。如果您希望看到它被调用一次,您可能希望管道执行以下操作:

    entities -> Square -> Print -> Add(1) -> SaveObservable
                                         \-> Add(2) -> PrintAll
    

    2) 然而,由于混合了 async IObservable . 制作多浇铸管道的懒散方法是使用 .Publish().RefCount() 添加调用后:

    entities = Square(entities);
    entities = Print(entities);
    var entitiesAdded = Add(entities, 1).Publish().RefCount();
    var _ = await SaveObservable(entitiesAdded);
    entities = Add(entitiesAdded, 2);
    
    await PrintAll(entities);
    

    但这不起作用,因为 Publish 使可观察的是热的,一旦用于 SaveObservable ,项目不会再次发射。另一种方法是 .Publish() 后跟一个 .Connect() 当所有的孩子都联系在一起,但这并不适合 await .

    代码的工作方式是添加 .Replay().RefCount() ,但它可能表现不好,因为Rx必须设置一个内部缓冲区,该缓冲区在订阅生命周期内保存所有项目。

    var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();
    
    entities = Square(entities);
    entities = Print(entities);
    entities = Add(entities, 1).Replay().RefCount();
    entities = await SaveObservable(entities);
    entities = Add(entities, 2);
    
    await PrintAll(entities);
    

    有关热/冷观测的更多信息, 出版 , Connect , Replay RefCount , see here .

        2
  •  2
  •   Enigmativity    7 年前

    您有两个订阅的原因是由于以下两行:

    1. var last = await source.LastOrDefaultAsync();
    2. await source.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));

    两者都会导致对原始源的订阅可见。

    第一个是有效的:

    var last = await
        new double[] { 1, 2, 4, 8, 16, 32 }
            .ToObservable()
            .Select(i => Math.Pow(i, 2.0))
            .Do(i => Console.WriteLine("Print: {0}", i))
            .Select(i => i + 1)
            .LastOrDefaultAsync();
    

    第二个是有效的:

    await
        new double[] { 1, 2, 4, 8, 16, 32 }
            .ToObservable()
            .Select(i => Math.Pow(i, 2.0))
            .Do(i => Console.WriteLine("Print: {0}", i))
            .Select(i => i + 1)
            .Select(i => i + 2)
            .ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));
    

    这个问题的答案是避免任何 async / await 密码无论如何,Rx处理所有同步工作的能力都远远优于TPL。

    你也可以 等候 不涉及任务的可观察的。

    也许可以这样尝试您的代码:

    async void Main()
    {
        var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();
    
        entities = Square(entities);
        entities = Print(entities);
        entities = Add(entities, 1);
        entities = SaveObservable(entities);
        entities = Add(entities, 2);
    
        await PrintAll(entities);
    }
    
    public IObservable<double> Square(IObservable<double> source)
        => source.Select(i => Math.Pow(i, 2.0));
    
    public IObservable<double> Add(IObservable<double> source, double add)
        => source.Select(i => i + add);
    
    public IObservable<T> Print<T>(IObservable<T> source)
        => source.Do(i => Console.WriteLine("Print: {0}", i));
    
    public IObservable<T> SaveObservable<T>(IObservable<T> source)
    {
        return Observable.Create<T>(o =>
        {
            var last = default(T);
            return 
                source
                    .Do(
                        t => last = t,
                        () =>
                        {
                            Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
                        })
                    .Subscribe(o);
        });
    }
    
    public IObservable<double> PrintAll(IObservable<double> source)
        => source.Do(i => Console.WriteLine("PrintAll: {0}", i));
    

    这将产生以下输出:

    Print: 1
    PrintAll: 4
    Print: 4
    PrintAll: 7
    Print: 16
    PrintAll: 19
    Print: 64
    PrintAll: 67
    Print: 256
    PrintAll: 259
    Print: 1024
    PrintAll: 1027
    Collected observable and saving to the db. Last element is '1025'
    

    允许 SaveObservable 要在继续管道之前完成,您需要添加一个 .ToArray() 使用这种方法。这改变了 IObservable<T> 生成零个或多个值到 IObservable<T[]> 这将生成一个包含零个或多个元素的数组。因此,它必须在继续之前完成所有值。

    尝试更改 保存可观察 收件人:

    public IObservable<T> SaveObservable<T>(IObservable<T> source)
    {
        return
            source
                .ToArray()
                .Do(ts =>
                {
                    var last = ts.Last();
                    Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
                })
                .SelectMany(t => t);
    }
    

    现在输出:

    Print: 1
    Print: 4
    Print: 16
    Print: 64
    Print: 256
    Print: 1024
    Collected observable and saving to the db. Last element is '1025'
    PrintAll: 4
    PrintAll: 7
    PrintAll: 19
    PrintAll: 67
    PrintAll: 259
    PrintAll: 1027
    

    代码现在实际上是这样的:

    await
        new double[] { 1, 2, 4, 8, 16, 32 }
            .ToObservable()
            .Select(i => Math.Pow(i, 2.0))
            .Do(i => Console.WriteLine("Print: {0}", i))
            .Select(i => i + 1)
            .ToArray()
            .Do(ts =>
            {
                var last = ts.Last();
                Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
            })
            .SelectMany(t => t)
            .Select(i => i + 2)
            .Do(i => Console.WriteLine("PrintAll: {0}", i));