代码之家  ›  专栏  ›  技术社区  ›  Nicklas2751

ForkJoinFramework只使用两个worker

  •  0
  • Nicklas2751  · 技术社区  · 7 年前

    现在我的问题来了,直到每个线程都在同一时间完成了他的50个全部四个工作快速anf。但是在两个停止工作并等待加入之后,只有另外两个正在工作并创建新的分叉和爬行页面。

    为了可视化这一点,我计算了线程爬行的mouch URL的数量,并让JavaFX gui显示出来。

    ForkJoinFramewok只使用四个允许线程中的两个,这有什么错?我能做些什么来改变它?

    以下是我的任务计算方法:

        LOG.debug(
           Thread.currentThread().getId() + " Starting new Task with " 
              + urlsToCrawl.size() + " left."
        );
        final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
        for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
        {
            urlsToCrawlSubset.offer(urlsToCrawl.poll());
        }
        LOG.debug(
           Thread.currentThread().getId() + " Crated a Subset with " 
           + urlsToCrawlSubset.size() + "."
        );
        LOG.debug(
           Thread.currentThread().getId() 
           + " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
        );
    
        if (urlsToCrawl.isEmpty())
        {
            LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
            crawlPage(urlsToCrawlSubset);
        }
        else
        {
            LOG.debug(
               Thread.currentThread().getId() 
                  + " Creating a new Task and crawling the subset."
            );
            final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
            otherTask.fork();
            crawlPage(urlsToCrawlSubset);
            taskResults.addAll(otherTask.join());
        }
        return taskResults;
    

    enter image description here

    P、 如果我允许最多80个线程,它将使用它们,直到每个有50个URL被爬网,然后只使用两个。

    如果您感兴趣,以下是完整的源代码: https://github.com/mediathekview/MServer/tree/feature/cleanup

    1 回复  |  直到 7 年前
        1
  •  0
  •   Nicklas2751    7 年前

    我把它修好了。我的错误是,我把一个小的protion分开,然后工作,然后等待,而不是把它分成两半,然后和其余的另一半一起再次调用我自己,等等。

    @Override
    protected Set<T> compute()
    {
        if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask())
        {
            crawlPage(urlsToCrawl);
        }
        else
        {
            final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl));
            final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl);
            leftTask.fork();
            taskResults.addAll(rightTask.compute());
            taskResults.addAll(leftTask.join());
        }
        return taskResults;
    }
    
    private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue)
    {
        final int halfSize = aBaseQueue.size() / 2;
        final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
        for (int i = 0; i < halfSize; i++)
        {
            urlsToCrawlSubset.offer(aBaseQueue.poll());
        }
        return urlsToCrawlSubset;
    }