代码之家  ›  专栏  ›  技术社区  ›  kaqqao

如何使用ExecutorService轮询直到结果到达

  •  16
  • kaqqao  · 技术社区  · 8 年前

    我有一个场景,我必须轮询远程服务器,检查任务是否已完成。一旦有了,我就进行不同的调用来检索结果。

    我最初认为我应该使用 SingleThreadScheduledExecutor 具有 scheduleWithFixedDelay 投票:

    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);
    
    public void poll(String jobId) {
       boolean jobDone = remoteServer.isJobDone(jobId);
       if (jobDone) {
           retrieveJobResult(jobId);
       }
    }
    

    但既然我只能提供 Runnable 带固定延迟的时间表 它不能返回任何东西,我不明白 future future.get() 甚至是卑鄙的?我在等待什么结果?

    第一次检测到远程任务已完成时,我希望执行不同的远程调用,并将其结果设置为 .我想我可以用CompletableFuture来做这件事,我希望 poll 方法,然后将其转发给 retrieveTask 最终将完成它的方法:

    CompletableFuture<Object> result = new CompletableFuture<Object>();
    ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);
    
    public void poll(String jobId, CompletableFuture<Object> result) {
       boolean jobDone = remoteServer.isJobDone(jobId);
       if (jobDone) {
           retrieveJobResult(jobId, result);
       }
    }
    
    public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
        Object remoteResult = remoteServer.getJobResult(jobId);
        result.complete(remoteResult);
    }
    

    但这有很多问题。一方面, CompletableFuture 甚至似乎不打算用于这种用途。相反,我应该这样做 CompletableFuture.supplyAsync(() -> poll(jobId)) 我想,但我该如何正确关闭 executor 并取消 将来 当我的 完全未来 是否已取消/完成?感觉轮询应该以完全不同的方式实现。

    3 回复  |  直到 8 年前
        1
  •  13
  •   Andrew Rueckert    3 年前

    我认为CompletableFutures是一个很好的方法:

    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    
    private void run() {
        final Object jobResult = pollForCompletion("jobId1")
                .thenApply(jobId -> remoteServer.getJobResult(jobId))
                .get();
    
    }
    
    private CompletableFuture<String> pollForCompletion(final String jobId) {
        CompletableFuture<String> completionFuture = new CompletableFuture<>();
        final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
            if (remoteServer.isJobDone(jobId)) {
                completionFuture.complete(jobId);
            }
        }, 0, 10, TimeUnit.SECONDS);
        completionFuture.whenComplete((result, thrown) -> {
            checkFuture.cancel(true);
        });
        return completionFuture;
    }
    
        2
  •  2
  •   Jason Hu    8 年前

    CompletableFuture 有两个角色:一个是传统的未来,它为任务执行和状态查询提供异步源;另一个是我们通常所说的承诺。一个承诺,如果你还不知道,可以被视为未来的建设者和它的完成来源。因此,在这种情况下,直觉上需要一个承诺,这正是您在这里使用的情况。您所担心的示例是介绍第一种用法,但不是承诺方式。

    接受这一点,你应该更容易开始处理你的实际问题。我认为promise应该有两个角色,一个是通知任务完成轮询,另一个是在完成时取消计划任务。最终解决方案如下:

    public CompletableFuture<Object> pollTask(int jobId) {
        CompletableFuture<Object> fut = new CompletableFuture<>();
        ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
        fut.thenAccept(ignore -> sfuture.cancel(false));
        return fut;
    }
    
    private void _poll(int jobId, CompletableFuture<Object> fut) {
        // whatever polls
        if (isDone) {
            fut.complete(yourResult);
        }
    }
    
        3
  •  1
  •   lance-java    3 年前

    我为此创建了一个通用实用程序 this answer 使用 Supplier<Optional<T>> 由此每次投票都可以返回 Optional.empty() 直到值准备好。我还实现了 timeout 所以 TimeoutException 如果超过最大时间,则抛出。

    用法:

    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
    CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
       .supplier(supplier)
       .executorService(scheduledExecutor)
       .timeUnit(TimeUnit.SECONDS)
       .initialDelay(5)
       .period(5)
       .timeout(60 * 5)
       .build();
    

    ScheduledCompletableFuture.java

    public class ScheduledCompletableFuture {
        public static class ScheduledCompletableFutureBuilder<T> {
            private Supplier<Optional<T>> supplier;
            private ScheduledExecutorService executorService;
            private Long initialDelay;
            private Long period;
            private Long timeout;
            private TimeUnit timeUnit;
    
            public ScheduledCompletableFutureBuilder() {
            }
    
            public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
                this.supplier = supplier;
                return this;
            }
    
            public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
                this.executorService = executorService;
                return this;
            }
    
            public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
                this.initialDelay = initialDelay;
                return this;
            }
    
            public ScheduledCompletableFutureBuilder<T> period(long period) {
                this.period = period;
                return this;
            }
    
            public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
                this.timeout = timeout;
                return this;
            }
    
            public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
                this.timeUnit = timeUnit;
                return this;
            }
    
            public CompletableFuture<T> build() {
                // take a copy of instance variables so that the Builder can be re-used
                Supplier<Optional<T>> supplier = this.supplier;
                ScheduledExecutorService executorService = this.executorService;
                Long initialDelay = this.initialDelay;
                Long period = this.period;
                Long timeout = this.timeout;
                TimeUnit timeUnit = this.timeUnit;
    
                CompletableFuture<T> completableFuture = new CompletableFuture<>();
                long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
                Runnable command = () -> {
                    Optional<T> optional = supplier.get();
                    if (optional.isPresent()) {
                        completableFuture.complete(optional.get());
                    } else if (System.currentTimeMillis() > endMillis) {
                        String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
                        completableFuture.completeExceptionally(new TimeoutException(msg));
                    }
                };
                ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
                return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
            }
        }
    
        public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
            return new ScheduledCompletableFutureBuilder<>();
        }
    }