代码之家  ›  专栏  ›  技术社区  ›  lennyklb

如何在芹菜中异步接收已处理的消息?

  •  0
  • lennyklb  · 技术社区  · 6 年前

    我正在使用芹菜编写一个数据处理管道,因为这大大加快了速度。

        from celery.result import ResultSet
        from some_celery_app import processing_task # of type @app.task
    
        def crunch_data():
            results = ResultSet([])
            for document in mongo.find(): #Around 100K - 1M documents
                job = processing_task.delay(document)
                results.add(job)
    
            return results.get()
    
        collected_data = crunch_data()
        #Do some stuff with this collected data
    
    

    我成功地生成了四个启用了并发的worker,当我运行这个脚本时,数据会得到相应的处理,我可以做任何我想做的事情。

    我使用RabbitMQ作为消息代理和 rpc 作为后端。

    1. 首先,处理所有文件
    2. results.get() 打电话。

    我的问题是:有没有一种方法可以同时进行处理和随后的检索?在我的例子中,由于所有文档都是互不依赖的原子实体,因此似乎不需要等待作业被完全处理。

    1 回复  |  直到 6 年前
        1
  •  0
  •   SargeATM    6 年前

    您可以在中尝试回调参数 ResultSet.get(callback=cbResult) 然后可以在回调中处理结果。

    def cbResult(task_id, value):
      print(value)
    results.get(callback=cbResult)