代码之家  ›  专栏  ›  技术社区  ›  synic

演员“排队”?

  •  3
  • synic  · 技术社区  · 14 年前

    在Java中,为了编写一个向服务器发出请求的库,我通常实现某种调度器(与Twitter 4J库中发现的一个不同): http://github.com/yusuke/twitter4j/blob/master/twitter4j-core/src/main/java/twitter4j/internal/async/DispatcherImpl.java )限制连接数,执行异步任务等。

    其思想是创建n个线程。一个“任务”被排队并通知所有线程,当它准备就绪时,其中一个线程将从队列中弹出一个项目,完成工作,然后返回到等待状态。如果所有线程都忙于处理某个任务,那么该任务只是排队,下一个可用的线程将处理它。

    这将保持到n的最大连接数,并允许最多n个任务同时运行。

    我想知道我能用演员创造什么样的系统来完成同样的事情?有没有一种方法可以让n个参与者参与进来,当一条新消息准备好时,将它传递给一个参与者来处理它——如果所有参与者都忙,只需将消息排队即可?

    2 回复  |  直到 14 年前
        1
  •  4
  •   Vasil Remeniuk    14 年前

    Akka Framework 是为了解决这类问题而设计的,这正是你想要的。

    看看这个 docu -有许多高度可配置的Dispathers(基于事件、基于线程、负载平衡、工作窃取等)来管理参与者邮箱,并允许它们协同工作。你也会发现有趣 this blog post .

    例如,此代码基于固定线程池实例化新的窃取工作调度程序,该调度程序在它所监控的参与者之间实现负载平衡:

      val workStealingDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
      workStealingDispatcher
      .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
      .setCorePoolSize(16)
      .buildThreadPool
    

    使用调度程序的参与者:

    class MyActor extends Actor {
    
        messageDispatcher = workStealingDispatcher
    
        def receive = {
          case _ =>
        }
      }
    

    现在,如果启动2个以上的actor实例,那么调度程序将在actor的邮箱(队列)之间平衡负载(邮箱中消息太多的actor将“捐赠”一些给那些没有任何功能的actor)。

        2
  •  1
  •   Daniel C. Sobral    14 年前

    好吧,您必须了解actors调度程序,因为actors通常不是带线程的1对1。参与者背后的想法是,您可能拥有许多参与者,但实际线程数量将限制在合理的范围内。它们也不应该长时间运行,而是很快地响应接收到的消息。简而言之,该代码的体系结构似乎与设计参与者系统的方式完全不一致。

    不过,每个工作参与者可能会向队列参与者发送一条消息,请求下一个任务,然后进行循环以做出反应。此队列参与者将接收排队消息或出列消息。它可以这样设计:

    val q: Queue[AnyRef] = new Queue[AnyRef]
    loop {
      react {
        case Enqueue(d) => q enqueue d
        case Dequeue(a) if q.nonEmpty => a ! (q dequeue)
        }
    }