代码之家  ›  专栏  ›  技术社区  ›  kordax

拦截scala中socket close上的akka http websocket事件

  •  0
  • kordax  · 技术社区  · 6 年前

    我正在使用带有akka http的scala。

    我有一个基于akka http的服务器后端应用程序。传入的WebSocket消息使用 handleWebSocketMessages :

    /**
      * Route for WebSocket request
      */
    private val webSocketRoute: Route = pathSuffix(Constants.WSPROXY_WEBSOCKET_PATH_SUFFIX) {
        LOGGER.debug("() Web socket route")
        handleWebSocketMessages(wsRouteImpl)
    }
    
    /**
      * This method calls all registered handlers.
      *
      * @return flow for handleWebSocketMessages method
      */
    private def wsRouteImpl: Flow[Message, Message, Any] = {
        LOGGER.debug("() wsRouteHandling")
        numOfClients += 1
    
        LOGGER.info(s"Client has connected, current number of clients: $numOfClients")
    
        var flow = Flow[Message].mapConcat {
      // Call specific handlers depending on message type
      ...
    }
    

    我的websocket客户通过keep alive建立双向通信连接。

    绑定使用:

    val binding = Http().bindAndHandle(webSocketRoute, config.host, config.port)
    

    问题是,我需要为一个封闭的套接字注入一个回调(例如,如果一个客户机断开),并且减少当前的客户数,但是我找不到任何入口点。

    是否可以在套接字关闭时捕获某种事件?

    1 回复  |  直到 6 年前
        1
  •  3
  •   Jeffrey Chung    6 年前

    使用 watchTermination :

    val numOfClients = new java.util.concurrent.atomic.AtomicInteger(0)
    
    private val webSocketRoute: Route = pathSuffix(Constants.WSPROXY_WEBSOCKET_PATH_SUFFIX) {
      LOGGER.debug("() Web socket route")
    
      val wsFlow: Flow[Message, Message, Any] =
        wsRouteImpl.watchTermination() { (_, fut) =>
          numOfClients.incrementAndGet()
          LOGGER.info(s"Client has connected. Current number of clients: $numOfClients")
    
          fut onComplete {
            case Success(_) =>
              numOfClients.decrementAndGet()
              LOGGER.info(s"Client has disconnected. Current number of clients: $numOfClients")
            case Failure(ex) =>
              numOfClients.decrementAndGet()
              LOGGER.error(s"Disconnection failure (number of clients: $numOfClients): $ex")
          }
        }
    
      handleWebSocketMessages(wsFlow)
    }
    
    private def wsRouteImpl: Flow[Message, Message, Any] = ???