代码之家  ›  专栏  ›  技术社区  ›  erip Jigar Trivedi

为什么我的路由器没有接收签入事件?

  •  0
  • erip Jigar Trivedi  · 技术社区  · 6 年前

    我开始探索新的Akka类型API。我正在试着运行随机路由器的更新版本 this blog post .

    我的路由器基本上是一样的:

    import java.util.concurrent.ThreadLocalRandom
    
    import akka.actor.Address
    import akka.actor.typed.{ActorRef, Behavior}
    import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
    import akka.actor.typed.scaladsl.Behaviors
    import akka.cluster.ClusterEvent.{ReachabilityEvent, ReachableMember, UnreachableMember}
    import akka.cluster.typed.{Cluster, Subscribe}
    
    object RandomRouter {
      private final case class WrappedReachabilityEvent(event: ReachabilityEvent)
    
      // subscribes to cluster reachability events and
      // avoids routees that are unreachable
      def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
        Behaviors.setup[Any] { ctx ⇒
          ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
    
          val cluster = Cluster(ctx.system)
          // typically you have to map such external messages into this
          // actor's protocol with a message adapter
          val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply)
    
          cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
    
          def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =
            Behaviors.receive { (ctx, msg) ⇒
              msg match {
                case serviceKey.Listing(services) ⇒
                  if (services.isEmpty) {
                    ctx.log.info("Found no services")
                  } else {
                    ctx.log.info(s"Found services: ${services.map(_.path.name).mkString(", ")}")
                  }
                  routingBehavior(services.toVector, unreachable)
                case WrappedReachabilityEvent(event) => event match {
                  case UnreachableMember(m) =>
                    ctx.log.warning(s"Member ${m.address} has become unreachable")
                    routingBehavior(routees, unreachable + m.address)
                  case ReachableMember(m) =>
                    ctx.log.info(s"Member ${m.address} has become reachable again")
                    routingBehavior(routees, unreachable - m.address)
                }
    
                case other: T @unchecked ⇒
                  if (routees.isEmpty)
                    Behaviors.unhandled
                  else {
                    val reachableRoutes =
                      if (unreachable.isEmpty) routees
                      else routees.filterNot { r => unreachable(r.path.address) }
    
                    val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size)
                    reachableRoutes(i) ! other
                    Behaviors.same
                  }
              }
            }
    
          routingBehavior(Vector.empty, Set.empty)
        }.narrow[T]
    }
    

    我的集群由虚拟演员衍生:

    object DummyActor {
      def behavior[T](serviceKey: ServiceKey[T]): Behavior[Any] = Behaviors.setup { ctx =>
    
        ctx.log.info("Woohoo, I'm alive!")
        Behaviors.empty
      }
    }
    

    以下是:

    object MyCluster {
    
      val serviceKey: ServiceKey[String] = ServiceKey[String]("cluster")
    
      val behavior: Behavior[String] = Behaviors.setup { ctx =>
    
        (1 to 5).foreach { i =>
          ctx.log.info("I'm so sleepy...")
          Thread.sleep(500)
          ctx.log.info(s"Spawning actor #$i")
          ctx.spawnAnonymous(DummyActor.behavior(serviceKey))
          ctx.log.info("I'm tired again...")
          Thread.sleep(500)
        }
    
        val router = ctx.spawn(RandomRouter.clusterRouter(serviceKey), "router")
    
        Behaviors.stopped
      }
    }
    

    当我运行下面的main时,我总是在我的日志中看到“find no services”,这表明我认为没有一个虚拟参与者向集群接待员注册。

    import akka.actor.typed.ActorSystem
    
    object Main extends App {
    
      val system = ActorSystem(MyCluster.behavior, "cluster-system")
    
    }
    

    我错过了什么?我用的是Akka 2.5.12。

    1 回复  |  直到 6 年前
        1
  •  0
  •   erip Jigar Trivedi    6 年前

    虚拟演员需要注册!这不是自动发生的。通过在 setup 街区:

    ctx.system.receptionist ! Receptionist.Register(serviceKey, ctx.self)
    
    object DummyActor {
      def behavior[T](serviceKey: ServiceKey[T]): Behavior[Any] = Behaviors.setup { ctx =>
    
        ctx.system.receptionist ! Receptionist.Register(serviceKey, ctx.self)
    
        ctx.log.info("Woohoo, I'm alive!")
        Behaviors.empty
      }
    }