代码之家  ›  专栏  ›  技术社区  ›  Nik

有没有一种简单的方法可以将未来变为未来?

  •  19
  • Nik  · 技术社区  · 14 年前

    我有一些代码向另一个线程提交了一个请求,这个线程可能也可能不会向另一个线程提交该请求。它产生一个返回类型 Future<Future<T>> . 有没有什么不可耻的方法可以立即把这个变成 Future<T> 等待整个未来链的完成?

    我已经在使用guava库来处理其他有趣的并发事务,并作为Google集合的替代品,它工作得很好,但是我似乎找不到适合这个案例的东西。

    5 回复  |  直到 12 年前
        1
  •  5
  •   Chris Povirk    12 年前

    番石榴13 Futures.dereference 这样做。它需要一个 ListenableFuture<ListenableFuture> 而不是平原 Future<Future> . (在平原上操作) Future 需要一个makelistenable调用,每个调用在任务的生命周期内都需要一个专用线程(通过方法的新名称更清楚地表明, JdkFutureAdapters.listenInPoolThread ))

        2
  •  7
  •   Geoff Reedy    14 年前

    另一个可能的实现使用了guava库,并且非常简单。

    import java.util.concurrent.*;
    import com.google.common.util.concurrent.*;
    import com.google.common.base.*;
    
    public class FFutures {
      public <T> Future<T> flatten(Future<Future<T>> future) {
        return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() {
          public ListenableFuture<T> apply(Future<T> f) {
            return Futures.makeListenable(f);
          }
        });
      }
    }
    
        3
  •  1
  •   Geoff Reedy    14 年前

    我认为这是最好的办法来执行未来的合同。我采取了尽可能不公开的策略,以确保它符合合同。尤其是GET的超时实现。

    import java.util.concurrent.*;
    
    public class Futures {
      public <T> Future<T> flatten(Future<Future<T>> future) {
        return new FlattenedFuture<T>(future);
      }
    
      private static class FlattenedFuture<T> implements Future<T> {
        private final Future<Future<T>> future;
    
        public FlattenedFuture(Future<Future<T>> future) {
          this.future = future;
        }
    
        public boolean cancel(boolean mayInterruptIfRunning) {
          if (!future.isDone()) {
            return future.cancel(mayInterruptIfRunning);
          } else {
            while (true) {
              try {
                return future.get().cancel(mayInterruptIfRunning);
              } catch (CancellationException ce) {
                return true;
              } catch (ExecutionException ee) {
                return false;
              } catch (InterruptedException ie) {
                // pass
              }
            }
          }
        }
    
        public T get() throws InterruptedException, 
                              CancellationException, 
                              ExecutionException 
        {
          return future.get().get();
        }
    
        public T get(long timeout, TimeUnit unit) throws InterruptedException, 
                                                         CancellationException, 
                                                         ExecutionException, 
                                                         TimeoutException 
        {
          if (future.isDone()) {
            return future.get().get(timeout, unit);
          } else {
            return future.get(timeout, unit).get(0, TimeUnit.SECONDS);
          }
        }
    
        public boolean isCancelled() {
          while (true) {
            try {
              return future.isCancelled() || future.get().isCancelled();
            } catch (CancellationException ce) {
              return true;
            } catch (ExecutionException ee) {
              return false;
            } catch (InterruptedException ie) {
              // pass
            }
          }
        }
    
        public boolean isDone() {
          return future.isDone() && innerIsDone();
        }
    
        private boolean innerIsDone() {
          while (true) {
            try {
              return future.get().isDone();
            } catch (CancellationException ce) {
              return true;
            } catch (ExecutionException ee) {
              return true;
            } catch (InterruptedException ie) {
              // pass
            }
          }
        }
      }
    }
    
        4
  •  0
  •   Dave    14 年前

    您可以创建一个类,如:

    public class UnwrapFuture<T> implements Future<T> {
        Future<Future<T>> wrappedFuture;
    
        public UnwrapFuture(Future<Future<T>> wrappedFuture) {
            this.wrappedFuture = wrappedFuture;
        }
    
        public boolean cancel(boolean mayInterruptIfRunning) {
            try {
                return wrappedFuture.get().cancel(mayInterruptIfRunning);
            } catch (InterruptedException e) {
                //todo: do something
            } catch (ExecutionException e) {
                //todo: do something
            }
        }
        ...
    }
    

    您必须处理get()可以引发但其他方法不能引发的异常。

        5
  •  0
  •   Nik    14 年前

    这是我第一次尝试,但我确信这有很多问题。我很乐意把它换成 Futures.compress(f) .

    public class CompressedFuture<T> implements Future<T> {
        private final Future<Future<T>> delegate;
    
        public CompressedFuture(Future<Future<T>> delegate) {
            this.delegate = delegate;
        }
    
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (delegate.isDone()) {
                return delegate.cancel(mayInterruptIfRunning);
            }
            try {
                return delegate.get().cancel(mayInterruptIfRunning);
            } catch (InterruptedException e) {
                throw new RuntimeException("Error fetching a finished future", e);
            } catch (ExecutionException e) {
                throw new RuntimeException("Error fetching a finished future", e);
            }
        }
    
        @Override
        public T get() throws InterruptedException, ExecutionException {
            return delegate.get().get();
        }
    
        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
            Future<T> next = delegate.get(timeout, unit);
            return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        @Override
        public boolean isCancelled() {
            if (!delegate.isDone()) {
                return delegate.isCancelled();
            }
            try {
                return delegate.get().isCancelled();
            } catch (InterruptedException e) {
                throw new RuntimeException("Error fetching a finished future", e);
            } catch (ExecutionException e) {
                throw new RuntimeException("Error fetching a finished future", e);
            }
        }
    
        @Override
        public boolean isDone() {
            if (!delegate.isDone()) {
                return false;
            }
            try {
                return delegate.get().isDone();
            } catch (InterruptedException e) {
                throw new RuntimeException("Error fetching a finished future", e);
            } catch (ExecutionException e) {
                throw new RuntimeException("Error fetching a finished future", e);
            }
        }
    }