代码之家  ›  专栏  ›  技术社区  ›  Mrugesh

amqp.connect公司不能永远保持联系

  •  0
  • Mrugesh  · 技术社区  · 6 年前

    rabbitmq.js

    const connectRabbitMq = () => {
        amqp.connect(process.env.CLOUDAMQP_MQTT_URL, function (err, conn) {
            if (err) {
                console.error(err);
                console.log('[AMQP] reconnecting in 1s');
                setTimeout(connectRabbitMq, 1);
                return;
            }
            conn.createChannel((err, ch) => {
                if (!err) {
                    console.log('Channel created');
                    channel = ch;
                    connection = conn;
                }
            });
    
            conn.on("error", function (err) {
                if (err.message !== "Connection closing") {
                    console.error("[AMQP] conn error", err.message);
                }
            });
    
            conn.on("close", function () {
                console.error("[AMQP] reconnecting");
                connectRabbitMq();
            });
        })
    
    };
    
    
    const sendMessage = () => {
    
    
        let data = {
            user_id: 1,
            test_id: 2
        };
    
        if (channel) {
            channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
                persistent: true
            });
        }
    
        else {
    
            connectRabbitMq(() => {
                channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
                    persistent: true
                });
            })
        }
    };
    
    
    const receiveMessage = () => {
    
        if (channel) {
    
            channel.consume(q, function (msg) {
                // ch.ack(msg);
                console.log(" [x] Received %s", msg.content.toString());
            });
        }
    
        else {
    
            connectRabbitMq(() => {
                channel.consume(q, function (msg) {
                    // ch.ack(msg);
                    console.log(" [x] Received %s", msg.content.toString());
                });
            })
        }
    }
    

    调度程序.js

    let cron = require('node-cron');
    
    
    const callMethodForeverRabbitMq = () => {
    
    
        cron.schedule('*/1 * * * * *', function () {
    
            rabbitMqClientPipeline.receiveMessage();
        });
    
    
    };
    

    应用程序js

    rabbitmq.sendMessage();
    

    现在这里发生的是,代码不能永远保持连接。我有没有办法让它永远活着?

    1 回复  |  直到 6 年前
        1
  •  3
  •   m1ch4ls    6 年前

    我不确定你是使用Promise api还是callback api。

    使用Promise API,您可以这样做:

    const amqp = require('amqplib');
    
    const delay = (ms) => new Promise((resolve => setTimeout(resolve, ms)));
    
    const connectRabbitMq = () => amqp.connect('amqp://127.0.0.1:5672')
      .then((conn) => {
        conn.on('error', function (err) {
          if (err.message !== 'Connection closing') {
            console.error('[AMQP] conn error', err.message);
          }
        });
    
        conn.on('close', function () {
          console.error('[AMQP] reconnecting');
          connectRabbitMq();
        });
    
        //connection = conn;
    
        return conn.createChannel();
      })
      .then(ch => {
        console.log('Channel created');
        //channel = ch;
      })
      .catch((error) => {
        console.error(error);
        console.log('[AMQP] reconnecting in 1s');
        return delay(1000).then(() => connectRabbitMq())
      });
    
    connectRabbitMq();
    

    const amqp = require('amqplib/callback_api');
    
    const connectRabbitMq = () => {
      amqp.connect('amqp://127.0.0.1:5672', function (err, conn) {
        if (err) {
          console.error(err);
          console.log('[AMQP] reconnecting in 1s');
          setTimeout(connectRabbitMq, 1000);
          return;
        }
        conn.createChannel((err, ch) => {
          if (!err) {
            console.log('Channel created');
            //channel = ch;
            //connection = conn;
          }
        });
    
        conn.on("error", function (err) {
          if (err.message !== "Connection closing") {
            console.error("[AMQP] conn error", err.message);
          }
        });
    
        conn.on("close", function () {
          console.error("[AMQP] reconnecting");
          connectRabbitMq();
        });
      })
    };
    
    connectRabbitMq();
    

    更新 带请求缓冲的新代码

    const buffer = [];
    let connection = null;
    let channel = null;
    
    const connectRabbitMq = () => {
      amqp.connect('amqp://127.0.0.1:5672', function (err, conn) {
        if (err) {
          console.error(err);
          console.log('[AMQP] reconnecting in 1s');
          setTimeout(connectRabbitMq, 1000);
          return;
        }
        conn.createChannel((err, ch) => {
          if (!err) {
            console.log('Channel created');
            channel = ch;
            connection = conn;
    
            while (buffer.length > 0) {
              const request = buffer.pop();
              request();
            }
          }
        });
    
        conn.once("error", function (err) {
          channel = null;
          connection = null;
    
          if (err.message !== "Connection closing") {
            console.error("[AMQP] conn error", err.message);
          }
        });
    
        conn.once("close", function () {
          channel = null;
          connection = null;
    
          console.error("[AMQP] reconnecting");
          connectRabbitMq();
        });
      })
    };
    
    const sendMessage = () => {
      let data = {
        user_id: 1,
        test_id: 2
      };
    
      if (channel) {
        channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
          persistent: true
        });
      }
      else {
        buffer.push(() => {
          channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
            persistent: true
          });
        });
      }
    };
    
    const receiveMessage = () => {
      if (channel) {
        channel.consume(q, function (msg) {
          // ch.ack(msg);
          console.log(" [x] Received %s", msg.content.toString());
        });
      }
      else {
        buffer.push(() => {
          channel.consume(q, function (msg) {
            // ch.ack(msg);
            console.log(" [x] Received %s", msg.content.toString());
          });
        })
      }
    };
    

    在某些边缘情况下,此代码将无法工作-例如,它将无法重新建立 queue.consume 除非它被显式调用。但总的来说,这有助于你了解如何实施适当的恢复。。。