代码之家  ›  专栏  ›  技术社区  ›  Mohsen Kamrani

MqttClient发送最后一条消息之前的消息

  •  0
  • Mohsen Kamrani  · 技术社区  · 6 年前

    在我的代码中,这是一个处理程序(无服务器框架),我将消息推送到RabbitMQ,但问题是,当我发送第一条消息时,订阅者没有收到任何东西,而对于第二条消息,订阅者只收到第一条消息,其余消息也会收到同样的消息(当我发送消息时,会传递前一条消息!)。有什么想法吗?

    编辑 :我用非常基本和简单的代码替换了实际代码,但结果仍然相同。

    Lambda-创建。ts

    import { APIGatewayEvent, Context, Callback, Handler } from "aws-lambda";
    import { config } from "../common/config";
    import publish from "../common/publisher";
    
    export const create: Handler = (event: APIGatewayEvent, context: Context, cb: Callback) => {
        console.log('test started');
        context.callbackWaitsForEmptyEventLoop = false;
        const topic = 'float/push';
        const num = Math.random();
        const message = JSON.stringify({ floatId: num });
        publish(config.PUSH_BROKER_UFRL, config.PUSH_USERNAME, config.PUSH_PASSWORD, topic, message, () => {
            console.log('calling the callback');
            cb(null, {
                statusCode: 200,
                headers: {
                    'Access-Control-Allow-Origin': '*', // Required for CORS support to work
                },
                body: JSON.stringify({ id: num })
            });
        });
    };
    

    出版商ts

    import { Callback } from 'aws-lambda';
    import { Client, connect, MqttClient, Packet } from 'mqtt';
    
    function publish(brokerUrl: string, username: string, password: string, topic: string,
                     message: string, callback: (() => void)): void {
        console.log('publish started');
        const client: Client = connect(brokerUrl, {
            username,
            password
        });
        client.options.clientId = 'Cashmanager.portal';
        client.addListener('connect', () => {
            console.log('connected to the queue');
            console.log(`message to publish: ${JSON.stringify(message)}`);
            client.publish(topic, message, (err, packet) => {
                console.log(`err: ` + err);
                console.log(`packet: ${JSON.stringify(packet)}`);
                callback();
            });
        });
    }
    

    导出默认发布;

    cloudwatch日志示例:

    启动请求ID:ea63e6ca-318f-11e8-b766-b78fb7754d27版本:$最新 2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27测试 已启动 2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27发布 已启动 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27已连接 到队列 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27消息 要发布:“{\“floatId\”:0.24342369749799642}” 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27错误: 未定义 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27数据包: 未定义 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27呼叫 回调结束请求ID:ea63e6ca-318f-11e8-b766-b78fb7754d27

    1 回复  |  直到 6 年前
        1
  •  0
  •   Mohsen Kamrani    6 年前

    所以答案是你应该打电话 client#end 方法。这可能看起来很麻烦,但如果您想通过对Lambda函数的调用一次又一次地发布,我认为这将减少再次打开连接的机会。

    client.addListener('connect', () => {
        console.log('connected to the queue');
        console.log(`message to publish: ${JSON.stringify(message)}`);
        client.publish(topic, message, (err, packet) => {
            console.log(`err: ` + err);
            console.log(`packet: ${JSON.stringify(packet)}`);
            client.end(false, () => callback()); //This line should be added then it works as expected
    
        });
    });