代码之家  ›  专栏  ›  技术社区  ›  Joe K

Java“顺序”信号量

  •  1
  • Joe K  · 技术社区  · 6 年前

    我有一个问题,对于如何解决它,我有一个模糊的想法,但我会尽量过多地考虑上下文,以避免出现错误 XY problem .

    我有一个非同步的方法,可以立即返回 guava ListenableFuture 我需要打几十万或几百万次电话(未来本身可能需要一点时间才能完成)。我真的无法改变这种方法的内部结构。它内部涉及一些严重的资源争用,所以我想限制一次对该方法调用的次数。所以我试着用 Semaphore :

    public class ConcurrentCallsLimiter<In, Out> {
      private final Function<In, ListenableFuture<Out>> fn;
      private final Semaphore semaphore;
      private final Executor releasingExecutor;
    
      public ConcurrentCallsLimiter(
          int limit, Executor releasingExecutor,
          Function<In, ListenableFuture<Out>> fn) {
        this.semaphore = new Semaphore(limit);
        this.fn = fn;
        this.releasingExecutor = releasingExecutor;
      }
    
      public ListenableFuture<Out> apply(In in) throws InterruptedException {
        semaphore.acquire();
        ListenableFuture<Out> result = fn.apply(in);
        result.addListener(() -> semaphore.release(), releasingExecutor);
        return result;
      }
    }
    

    这样我就可以把我的电话放在这节课上,改为打电话:

    ConcurrentLimiter<Foo, Bar> cl =
        new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
    for (Foo foo : foos) {
      ListenableFuture<Bar> bar = cl.apply(foo);
      // handle bar (no pun intended)
    }
    

    某种程度上 作品问题是尾部延迟非常糟糕。有些调用会变得“不走运”,最终花很长时间试图获取该方法调用中的资源。一些内部指数退避逻辑加剧了这种情况,例如,与新呼叫相比,不走运的呼叫获得所需资源的机会越来越少,而新呼叫更急切,等待时间更短,然后重试。

    理想的解决方法是,如果有类似于那个信号量的东西,但它有一个顺序的概念。例如,如果限制为10,则第11次呼叫当前必须等待 任何 在前10个要完成的项目中。我想要的是第11个电话必须等到 第一 呼叫完成。这样一来,“不走运”的电话就不会因为不断有新的来电而继续挨饿。

    似乎我可以为调用分配一个整数序列号,并以某种方式跟踪尚未完成的最低序列号,但我不太明白如何实现这一点,尤其是因为实际上没有任何有用的“等待”方法 AtomicInteger 或者别的什么。

    3 回复  |  直到 6 年前
        1
  •  0
  •   HPCS    6 年前

    您可以使用公平参数创建信号量:

    Semaphore(int permits, boolean fair)
    

    //公平-如果此信号量将保证先入先出授予争用许可证,则为true,否则为false

        2
  •  0
  •   Joe K    6 年前

    我想到的一个不太理想的解决方案是使用 Queue 要存储所有的未来:

    public class ConcurrentCallsLimiter<In, Out> {
      private final Function<In, ListenableFuture<Out>> fn;
      private final int limit;
      private final Queue<ListenableFuture<Out>> queue = new ArrayDeque<>();
    
      public ConcurrentCallsLimiter(int limit, Function<In, ListenableFuture<Out>> fn) {
        this.limit = limit;
        this.fn = fn;
      }
    
      public ListenableFuture<Out> apply(In in) throws InterruptedException {
        if (queue.size() == limit) {
          queue.remove().get();
        }
        ListenableFuture<Out> result = fn.apply(in);
        queue.add(result);
        return result;
      }
    }
    

    然而,这似乎是对记忆的极大浪费。涉及的对象可能有点大,限制可能设定得相当高。所以我愿意接受没有O(n)内存使用的更好答案。似乎它在O(1)中是可行的。

        3
  •  0
  •   Alexei Kaigorodov    6 年前

    “如果有类似的信号,但有秩序的概念。” 有这样一头野兽。它叫 Blocking queue .创建这样一个队列,放置10个项目,然后使用 take 而不是 acquire put 而不是 release .