代码之家  ›  专栏  ›  技术社区  ›  Peter Lillevold Rene

为什么ParallelQuery<t>在转换为Observable时不起作用?

  •  3
  • Peter Lillevold Rene  · 技术社区  · 14 年前

    我有一个可观察的集合,我想并行处理,然后在过滤时观察处理的值,最后订阅一个接收过滤值的处理程序。

    我的示例在语法上是正确的,编译也很好,当我运行代码时, Where 对执行筛选的语句进行了计算。但是订阅没有数据。如果我删除 AsParallel 以便在常规 IEnumerable ,数据通过,一切按预期工作。

    这是我的示例,对字符串进行一些处理:

    // Generate some data every second
    var strings = Observable.Generate(() =>
        new TimeInterval<Notification<string>>(
            new Notification<string>
                .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
    
    // Process the data in parallel
    var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                          select "Parallel " + value;
    
    // Filter and observe
    var data = String.Empty;
    parallelStrings
        .Where(value => !String.IsNullOrEmpty(value))
        .ToObservable()
        .Subscribe(value => data = value);
    

    下一个奇怪的事情是如果我使用 TakeWhile 操作符,在我看来,它在概念上类似于where,观察parallelquery的工作方式如预期的那样:

    // Filter and observe
    var data = String.Empty;
    parallelStrings
        .TakeWhile(cs => !String.IsNullOrEmpty(cs))
        .ToObservable()
        .Subscribe(value => data = value);
    

    向订阅中添加一些日志代码表明在 ToObservable 转换,但不在以下时间之后:

    1.    var data = String.Empty;
    2.    parallelStrings
    3.        .Where(value => !String.IsNullOrEmpty(value))
    4.        .Select(value => value)
    5.        .ToObservable()
    6.        .Select(value => value)
    7.        .Subscribe(value => data = value);
    

    第4行lambda中的断点被命中,而第6行lambda中的断点从未被命中。

    为什么会 取而代之 使数据通过订阅服务器 在哪里? 不是吗?

    如果这很重要,我将在Visual Studio 2010 RC中使用面向.NET 4.0框架客户端配置文件的项目开发代码。

    更新 基于 @Sergeys answer 我修改了 在哪里? 过滤器。以下代码按预期工作:

    var processedStrings = from value in strings
                           let processedValue = "Parallel " + value
                           where !String.IsNullOrEmpty(processedValue)
                           select processedValue;
    
    var data = String.Empty;
    processedStrings
        .ToEnumerable()
        .AsParallel()
        .ToObservable()
        .Subscribe(value => data = value );
    

    首先转换最初的可观测数据还是有点尴尬 processedStrings 变成一个可枚举的,以便将其并行化,然后将其转换回一个可观测的,以便订阅最终的结果。

    2 回复  |  直到 14 年前
        1
  •  2
  •   Sergey Aldoukhov    14 年前

    C# 4.0 in a Nutshell :


    目前,PLINQ可以并行化的内容存在一些实际限制。这些 随后的服务包和框架版本可能会放宽限制。 以下查询运算符阻止查询并行化,除非 源元素处于其原始索引位置:

    • take、takewhile、skip和skipwhile
    • select、selectmany和elementat的索引版本

    大多数查询运算符更改元素的索引位置(包括那些 删除元素,如where)。这意味着如果要使用前面的 运算符,它们通常需要在查询开始时


    因此,实际上,使用takewhile可以防止.asparallel()并行化。很难说 为什么 在哪里取消订阅,但把它放在天门冬氨酸之前 可以 解决问题。

        2
  •  2
  •   Jon Skeet    14 年前

    TakeWhile 在概念上并不等同于 Where 因为这取决于顺序。我怀疑这个问题是 事实上 按顺序执行(参见 this blog post )尝试呼叫 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 在你 取而代之 例如,我怀疑你会看到同样的结果。

    我不知道为什么它在平行的情况下不起作用…我可以建议您进行一些日志记录,看看数据能达到什么程度吗?例如,您可以使用select执行有用的日志记录,select在日志记录后返回原始项。