代码之家  ›  专栏  ›  技术社区  ›  vgkowski

Akka流从外部世界获取无限流的当前值

  •  0
  • vgkowski  · 技术社区  · 6 年前

    获取无限流的当前值的最佳方法是什么?该流聚集了值,并且根据定义永远不完整

    Source.repeat(1)
      .scan(0)(_+_)
      .to(Sink.ignore)
    

    我想从Akka HTTP查询当前计数器值。我应该使用动态流吗?broadcastHub然后根据GET请求从Akka http订阅无限流?

    2 回复  |  直到 6 年前
        1
  •  1
  •   Stefano Bonetti    6 年前

    一种解决方案是使用参与者来保持所需的状态。 Sink.actorRef 将现有的actor ref包装在接收器中,例如。

    class Keeper extends Actor {
      var i: Int = 0
    
      override def receive: Receive = {
        case n: Int ⇒ i = n
        case Keeper.Get ⇒ sender ! i
      }
    }
    
    object Keeper {
      case object Get
    }
    
    val actorRef = system.actorOf(Props(classOf[Keeper]))
    
    val q = Source.repeat(1)
      .scan(0)(_+_)
      .runWith(Sink.actorRef(actorRef, PoisonPill))
    
    val result = (actorRef ? Keeper.Get).mapTo[Int]
    

    请注意,使用时不会保留背压 下沉actorRef公司 . 这可以通过使用 Sink.actorRefWithAck . 有关更多信息,请参阅 docs .

        2
  •  0
  •   Didac Montero    4 年前

    一种可能性是使用 Sink.actorRefWithBackpressure .

    想象一下,有以下参与者来存储来自流的状态:

    object StremState {
      case object Ack
      sealed trait Protocol                         extends Product with Serializable
      case object StreamInitialized                 extends Protocol
      case object StreamCompleted                   extends Protocol
      final case class WriteState[A](value: A)      extends Protocol
      final case class StreamFailure(ex: Throwable) extends Protocol
      final case object GetState                    extends Protocol
    }
    
    class StremState[A](implicit A: ClassTag[A]) extends Actor with ActorLogging {
      import StremState._
    
      var state: Option[A] = None
    
      def receive: Receive = {
        case StreamInitialized =>
          log.info("Stream initialized!")
          sender() ! Ack // ack to allow the stream to proceed sending more elements
    
        case StreamCompleted =>
          log.info("Stream completed!")
    
        case StreamFailure(ex) =>
          log.error(ex, "Stream failed!")
    
        case WriteState(A(value)) =>
          log.info("Received element: {}", value)
          state = Some(value)
          sender() ! Ack // ack to allow the stream to proceed sending more elements
    
        case GetState =>
          log.info("Fetching state: {}", state)
          sender() ! state
    
        case other =>
          log.warning("Unexpected message '{}'", other)
    
      }
    }
    

    然后,可以在流的接收器中使用该演员,如下所示:

        implicit val tm: Timeout           = Timeout(1.second)
        val stream: Source[Int, NotUsed]   = Source.repeat(1).scan(0)(_+_)
    
        val receiver = system.actorOf(Props(new StremState[Int]))
        val sink = Sink.actorRefWithBackpressure(
          receiver,
          onInitMessage = StremState.StreamInitialized,
          ackMessage = StremState.Ack,
          onCompleteMessage = StremState.StreamCompleted,
          onFailureMessage = (ex: Throwable) => StremState.StreamFailure(ex)
        )
    
        stream.runWith(sink)
        // Ask for Stream current state to the receiver Actor
        val futureState = receiver ? GetState