代码之家  ›  专栏  ›  技术社区  ›  Alireza Keshvari

如何在ws和express中向具有不同工作人员的所有客户端广播消息?

  •  1
  • Alireza Keshvari  · 技术社区  · 1 年前

    在我的express应用程序中,我使用集群和ws包来运行带有套接字连接的项目。当客户端连接到套接字时,其中一个工作进程将处理该连接(例如id为1的工作进程)。但是,当另一个客户端连接时,另一个工作进程将处理该连接(id为2的工作进程)。如果客户端1发送消息进行广播,则只有具有相同工作者id的客户端接收到该消息,而客户端2什么也没接收到。如何将数据广播给具有不同工作id的所有客户端?当我不使用集群时,它可以工作。

    const run = () => {
      if (cluster.isPrimary) {
        for (let index = 0; index < numberCPUs; index++) {
          cluster.fork();
        }
        cluster.on("exit", (worker, code, signal) => {
          console.error(`worker ${worker.process.pid} died (${signal || code}). restarting it in a sec`);
          setTimeout(() => cluster.fork(), 1000);
        });
      } else {
        console.log(`Worker ${cluster.worker.id} : started`);
        const http = require("node:http");
        const WebSocket = require("ws");
    
        const server = http.createServer(application);
        const wss = new WebSocket.Server({ server });
    
        wss.on("connection", (ws, request, client) => {
          console.log(`Worker ${cluster.worker.id} handled new connection !`);
          ws.on("message", (message) => {
            console.log(`Worker ${cluster.worker.id} : Received message ${message}`);
            broadcast(WebSocket, wss, message);
          });
        });
    
        server.listen(port, () => {
          console.log(`Server is running on ${port} ...`);
        });
      }
    };
    

    广播功能:

    const broadcast = (webSocket, wss, message) => {
      for (const client of wss.clients) {
        if (client.readyState === webSocket.OPEN) {
          client.send(message, { binary: false });
        }
      }
    };
    
    1 回复  |  直到 1 年前
        1
  •  1
  •   Nazrul Chowdhury    1 年前

    当在Node.js中使用集群来处理WebSocket连接时,实际上是在运行应用程序的多个实例,每个实例都有自己的一组工作进程。在您当前的方法中,您尝试使用broadcast函数向客户端广播消息,该函数对连接到WebSocket服务器(ws)的所有客户端进行迭代。这种方法只适用于连接到广播客户端所连接的同一工作进程的客户端。
    要在不同的工作进程中向所有客户端广播消息,您需要一种在不同工作进程之间共享WebSocket连接或消息的方法。一种方法可以是使用像Redis Pub/Sub这样的集中式消息传递机制。这将允许跨工作者通信来广播WebSocket消息。

        2
  •  0
  •   Alireza Keshvari    1 年前

    借助这个 answer 我解决了这个问题。 我曾经 publish subscribe 在redis中。我创建了 订阅 我的频道 app.js 文件:

    const server = http.createServer(application);
    const wss = new WebSocket.Server({ server });
    const WebSocket = require("ws");
    const redis = require("redis");
    
    
    // other codes ...
    
    const redisClient = redis.createClient({
      legacyMode: true,
    });
    const broadcastClient = redis.createClient();
    broadcastClient
      .connect()
      .then(() => {
        broadcastClient.subscribe("broadcast-channel", (message) => {
          for (const client of wss.clients) {
            if (client.readyState === WebSocket.OPEN) {
              client.send(message);
            }
          }
        });
    
        wss.on("connection", (ws) => {
          ws.on("message", (message) => {
            console.log(`Message: ${message}`);
          });
        });
      })
      .catch((error) => {
        console.log(error);
      });
    
    const run = () => {
      if (cluster.isPrimary) {
        for (let index = 0; index < numberCPUs; index++) {
          cluster.fork();
        }
        cluster.on("exit", (worker, code, signal) => {
          console.error(`worker ${worker.process.pid} died (${signal || code}). restarting it in a sec`);
          setTimeout(() => cluster.fork(), 1000);
        });
      } else {
        server.listen(port, () => {
          console.log(`Worker ${cluster.worker.id} : started on ${port}`);
        });
      }
    };
    

    然后用 出版 ,我发了信息。这是我要在其中广播消息的另一个文件:

    const redis = require("redis");
    
    const redisClient = redis.createClient();
    await redisClient.connect();
    redisClient.publish("broadcast-channel", "Hellow world!");