代码之家  ›  专栏  ›  技术社区  ›  Davide Icardi i3arnon

当客户端关闭Web套接字连接时停止Akka流源

  •  0
  • Davide Icardi i3arnon  · 技术社区  · 6 年前

    我有一个 akka http web socket Route 代码类似于:

    private val wsreader:路由= 路径(“v1”/“data”/“ws”)。{ log.info(“正在打开WebSocket连接…”)

      val testSource = Source
        .repeat("Hello")
        .throttle(1, 1.seconds)
        .map(x => {
          println(x)
          x
        })
        .map(TextMessage.Strict)
        .limit(1000)
    
      extractUpgradeToWebSocket { upgrade ⇒
        complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, testSource))
      }
    }
    

    一切正常(我每秒从客户机收到1条测试消息)。唯一的问题是我不知道如何停止/关闭 Source ( testSource )如果客户端关闭了Web套接字连接。

    可以看到源继续生成元素(请参见 println )另外,如果Web套接字已关闭。

    如何检测客户端断开连接?

    2 回复  |  直到 6 年前
        1
  •  1
  •   Arnout Engelen    6 年前

    具有sinksource的handlemessages实现为:

    /**
     * The high-level interface to create a WebSocket server based on "messages".
     *
     * Returns a response to return in a request handler that will signal the
     * low-level HTTP implementation to upgrade the connection to WebSocket and
     * use the supplied inSink to consume messages received from the client and
     * the supplied outSource to produce message to sent to the client.
     *
     * Optionally, a subprotocol out of the ones requested by the client can be chosen.
     */
    def handleMessagesWithSinkSource(
      inSink:      Graph[SinkShape[Message], Any],
      outSource:   Graph[SourceShape[Message], Any],
      subprotocol: Option[String]                   = None): HttpResponse =
    
      handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)
    

    这意味着接收器和源是独立的,甚至当客户端关闭连接的传入端时,源也应该继续生成元素。但是,当客户端完全重置连接时,它应该停止。

    要在传入连接关闭后立即停止生成传出数据,可以使用 Flow.fromSinkAndSourceCoupled 因此:

    val socket = upgrade.handleMessages(
      Flow.fromSinkAndSourceCoupled(inSink, outSource)
      subprotocol = None
    )
    
        2
  •  1
  •   Davide Icardi i3arnon    6 年前

    一种方法是使用 KillSwitches 处理testsource关闭。

    private val wsReader: Route =
    path("v1" / "data" / "ws") {
      logger.info("Opening websocket connecting ...")
    
      val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
    
      val testSource =
        Source
         .repeat("Hello")
         .throttle(1, 1.seconds)
         .map(x => {
           println(x)
           x
         })
        .map(TextMessage.Strict)
        .limit(1000)
        .via(sharedKillSwitch.flow)
    
      extractUpgradeToWebSocket { upgrade ⇒
        val inSink = Sink.onComplete(_ => sharedKillSwitch.shutdown())
        val outSource = testSource
        val socket = upgrade.handleMessagesWithSinkSource(inSink, outSource)
    
        complete(socket)
      }
    }