代码之家  ›  专栏  ›  技术社区  ›  sofsntp

刮取网站多个页面的并行请求

  •  0
  • sofsntp  · 技术社区  · 7 年前

    我想刮一个网站与有趣的数据大量网页,但由于源代码是非常大的,我想多线程和限制超载。 我使用 Parallel.ForEach 开始10个任务中的每一块,我在主屏幕上等待 for 循环,直到活动线程数开始下降到阈值以下。为此,我使用一个活动线程计数器,当使用 WebClient DownloadStringCompleted 的事件

    DownloadStringTaskAsync 而不是 DownloadString 并等待每个线程在 平行的ForEach公司 activeThreads )和a Thread.Sleep 在主foor循环中。

    await DownloadStringTaskAsync 而不是 下载字符串 应该在等待下载的字符串数据到达时释放一个线程来提高速度吗?

    回到最初的问题,有没有一种方法可以更优雅地使用第三方物流,而不需要涉及计数器?

    private static volatile int activeThreads = 0;
    
    public static void RecordData()
    {
      var nbThreads = 10;
      var source = db.ListOfUrls; // Thousands urls
      var iterations = source.Length / groupSize; 
      for (int i = 0; i < iterations; i++)
      {
        var subList = source.Skip(groupSize* i).Take(groupSize);
        Parallel.ForEach(subList, (item) => RecordUri(item)); 
        //I want to wait here until process further data to avoid overload
        while (activeThreads > 30) Thread.Sleep(100);
      }
    }
    
    private static async Task RecordUri(Uri uri)
    {
       using (WebClient wc = new WebClient())
       {
          Interlocked.Increment(ref activeThreads);
          wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
          var jsonData = "";
          RootObject root;
          jsonData = await wc.DownloadStringTaskAsync(uri);
          var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
          RecordData(root)
        }
    }
    
    2 回复  |  直到 7 年前
        1
  •  2
  •   Enigmativity    7 年前

    如果你想要一个优雅的解决方案,你应该使用微软的反应式框架。很简单:

    var source = db.ListOfUrls; // Thousands urls
    
    var query =
        from uri in source.ToObservable()
        from jsonData in Observable.Using(
            () => new WebClient(),
            wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
        select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };
    
    IDisposable subscription =
        query.Subscribe(x =>
        {
            /* Do something with x.uri && x.json */
        });
    

    这就是全部代码。它是一个很好的多线程系统,并且可以控制。

    只需NuGet“系统。反应”即可获得位。

        2
  •  -1
  •   Venson    7 年前
    Parallel.ForEach
    

    将创建ProcessorCount任务,以执行源枚举中每个项的函数。它会注意不要有太多的任务,并等待所有项目和任务执行。

    Task.WhenAll
    

    但是你的代码中有一些错误。函数 RecordUri 将返回一个必须等待的任务,否则ForEach将创建越来越多的任务,因为函数永远不知道当前任务何时完成。还有一个问题是,您在一个任务中创建了一个任务,第一个任务不做其他事情,然后等待第一个任务。

    Parallel.ForEach https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx

    编辑

    使用Wait DownloadStringTaskAsync而不是DownloadString是否应该通过在等待DownloadString数据到达时释放线程来提高速度?

    不同之处在于编译异步代码时编译器将产生的开销。这个 DownloadStringTaskAsync

    我的方法是:使用 synchronous method 在你的平行线内。ForEach。螺纹将由PLinq完成,您可以自由继续。