我正在使用芹菜编写一个数据处理管道,因为这大大加快了速度。
from celery.result import ResultSet
from some_celery_app import processing_task # of type @app.task
def crunch_data():
results = ResultSet([])
for document in mongo.find(): #Around 100K - 1M documents
job = processing_task.delay(document)
results.add(job)
return results.get()
collected_data = crunch_data()
#Do some stuff with this collected data
我成功地生成了四个启用了并发的worker,当我运行这个脚本时,数据会得到相应的处理,我可以做任何我想做的事情。
我使用RabbitMQ作为消息代理和
rpc
作为后端。
-
首先,处理所有文件
-
results.get()
打电话。
我的问题是:有没有一种方法可以同时进行处理和随后的检索?在我的例子中,由于所有文档都是互不依赖的原子实体,因此似乎不需要等待作业被完全处理。