代码之家  ›  专栏  ›  技术社区  ›  BKE

Java不使用所有可用的CPU

  •  12
  • BKE  · 技术社区  · 6 年前

    我有一个长时间运行的计算,需要对一长串输入进行计算。这些计算是独立的,所以我想将它们分发给几个CPU。我正在使用Java 8。

    代码的框架如下所示:

    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
    
    MyService myService = new MyService(executorService);
    
    List<MyResult> results =
                myInputList.stream()
                         .map(myService::getResultFuture)
                         .map(CompletableFuture::join)
                         .collect(Collectors.toList());
    
    executorService.shutdown();
    

    负责计算的主要功能如下所示:

    CompletableFuture<MyResult> getResultFuture(MyInput input) {
        return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
    }
    

    长时间运行的计算是无状态的,不执行任何IO。

    我希望这段代码能够使用所有可用的CPU,但这并没有发生。例如,在具有72个CPU和 numThreads=72 (甚至例如。 numThreads=500 ),cpu使用率最高为500-1000%,如htop所示:

    htop

    根据线程转储,许多计算线程正在等待,即:

    "pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
       java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
       Locked ownable synchronizers:
        - None
    

    所有计算线程都在等待相同的锁。在转储时,只有5个计算线程可运行,其余线程正在等待。

    锁定的原因是什么?为什么我不能使用所有CPU?

    1 回复  |  直到 6 年前
        1
  •  14
  •   ernest_k Petronella    6 年前

    您正在提交作业并致电 join() 之后,等待异步作业完成。

    流中间步骤是按元素执行的,这意味着中间步骤 .map(CompletableFuture::join) 一次在一个元素上运行(更糟糕的是,它是一个连续的流),而不确保所有元素都已完成提交步骤。这会导致线程在等待每个计算完成时阻塞。

    在开始调用之前,必须强制提交所有作业 join() 在他们身上:

    List<MyResult> results =
        myInputList.stream()
                   .map(myService::getResultFuture)
                   .collect(Collectors.toList()).stream()
                   .map(CompletableFuture::join)
                   .collect(Collectors.toList());
    

    如果你能表达出你想用 results 列表作为完成所有操作时要调用的操作,您可以以不阻止线程的方式实现该操作 join() :

    List<CompletableFuture<MyResult>> futures = myInputList.stream()
        .map(myService::getResultFuture)
        .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
        .thenRun(() -> {
            List<MyResult> results = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
            // perform action with results
        });
    

    它仍在呼叫 join() 检索结果,但此时,所有未来都已完成,因此调用方不会被阻止。