代码之家  ›  专栏  ›  技术社区  ›  kornisb

使用默认值中断CompletableFuture

  •  0
  • kornisb  · 技术社区  · 7 年前

    假设我有3个服务。 首先我打电话 serviceA ,它返回一个 CompletableFuture . 之后我打电话给 serviceB serviceC 平行地( thenCompose() ). 在我有了所有的结果,我想结合所有3个结果,并返回给一些调用它。 在调用者中,我希望在整个过程中总共等待X millseconds,以便:

    • 如果我在 服务A
    • 如果我在 服务B 调用正在进行:返回一些默认值(它们是可选的)。 这就是我尝试使用 getNow(fallback) 方法 可完成的未来

    如果我在中使用长延迟,请检查下面的代码片段 服务B 电话,我总是以一个 TimeoutException . 我该怎么做?

    public CompletableFuture<Result> getFuture() {
        CompletableFuture<A> resultA = serviceA.call();
        CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
        CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
        return CompletableFuture.allOf(resultB, resultC)
                .thenApply(ignoredVoid -> combine(
                        resultA.join(),
                        resultB.getNow(fallbackB),
                        resultC.getNow(fallbackC));
    }
    
    public Result extractFuture(CompletableFuture<Result> future) {
        Result result;
        try {
            result = future.get(timeOut, MILLISECONDS);
        } catch (ExecutionException ex) {
            ...
        } catch (InterruptedException | TimeoutException ex) {
            // I always ends up here...
        }
        return result;
    }
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   Holger    7 年前

    返回的未来 .allOf(resultB, resultC) resultB resultC 因此,依赖函数 ignoredVoid -> combine(resultA.join(), resultB.getNow(fallbackB), resultC.getNow(fallbackC) 只有在以下情况下才会进行评估 结果b 结果c

    通常不可能对 get() 在这些函数内调用。考虑到可以有任意数量的 get() thenApply 仅评估一次。

    在以下时间内处理使用者指定超时的唯一方法 getFuture() 将其更改为返回接收超时的函数:

    interface FutureFunc<R> {
        R get(long time, TimeUnit u) throws ExecutionException;
    }
    public FutureFunc<Result> getFuture() {
        CompletableFuture<A> resultA = serviceA.call();
        CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
        CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
        CompletableFuture<Result> optimistic = CompletableFuture.allOf(resultB, resultC)
            .thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), resultC.join()));
        return (t,u) -> {
            try {
                return optimistic.get(t, u);
            } catch (InterruptedException | TimeoutException ex) {
                return combine(resultA.join(), resultB.getNow(fallbackB),
                                               resultC.getNow(fallbackC));
            }
        };
    }
    
    public Result extractFuture(FutureFunc<Result> future) {
        Result result;
        try {
            result = future.get(timeOut, MILLISECONDS);
        } catch (ExecutionException ex) {
            ...
        }
        return result;
    }
    

    现在,可以进行不同超时的不同呼叫,只要B或C尚未完成,结果可能不同。并不是说关于 combine 方法也可能需要一些时间。

    您可以将函数更改为

    return (t,u) -> {
        try {
            if(resultB.isDone() && resultC.isDone()) return optimistic.get();
            return optimistic.get(t, u);
        } catch (InterruptedException | TimeoutException ex) {
            return combine(resultA.join(), resultB.getNow(fallbackB),
                                           resultC.getNow(fallbackC));
        }
    };
    

    等待可能已经运行的 结合 . 在任何一种情况下,都不能保证结果在指定时间内交付,因为即使使用了B和C的回退值,也会执行 结合 这可能需要任意时间。

    public FutureFunc<Result> getFuture() {
        CompletableFuture<A> resultA = serviceA.call();
        CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
        CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
        CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
        CompletableFuture<Result> result = bAndC
            .thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(),
                                                              resultC.join()));
        return (t,u) -> {
            try {
                bAndC.get(t, u);
            } catch (InterruptedException|TimeoutException ex) {
                resultB.complete(fallbackB);
                resultC.complete(fallbackC);
            }
            try {
                return result.get();
            } catch (InterruptedException ex) {
                throw new ExecutionException(ex);
            }
        };
    }
    

    这样,所有查询都可以在单个 FutureFunc 将始终返回相同的结果,即使它基于第一个超时引起的回退值。该变体还始终排除执行 结合

    当然,如果根本不打算使用不同的超时,那么可以重构 getFuture() 提前获得所需的超时,例如作为参数。这将大大简化实施,并可能再次带来未来:

    public CompletableFuture<Result> getFuture(long timeOut, TimeUnit u) {
        CompletableFuture<A> resultA = serviceA.call();
        CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
        CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
        ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
        e.schedule(() -> resultB.complete(fallbackB), timeOut, u);
        e.schedule(() -> resultC.complete(fallbackC), timeOut, u);
        CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
        bAndC.thenRun(e::shutdown);
        return bAndC.thenApply(ignoredVoid ->
                               combine(resultA.join(), resultB.join(), resultC.join()));
    }