我在用
celery
具有
rabbitMQ
作为代理来管理异步任务。
我将一些任务组合在一起,当每个任务完成后,我将运行一个
串联任务
这将加入每个结果。这些任务集合由
报告id
由我设定。
chord([task1.s(report_id=report_id), task2.s(report_id=report_id)...]) \
(concat_task.s(report_id=report_id).set(queue='default') \
.on_error(on_chord_error.s().set(queue='default')))
这是我的习惯
on_error
任务:
@celery_app.task
def on_chord_error(request, exc, traceback):
logger.error('Chord {0!r} raised error: {1!r}'.format(request.id, exc), exc_info=(type(exc), exc.args, traceback))
# BaseHandler.remove_placeholder(???report_id???) # cleaning action
我的问题在电话上
论错误
当前位置每次和弦失败(无论什么原因),我都需要
报告id
(不要与request.id混淆)传递给它以执行清理操作。然而
documentation
我发现这一点很模糊。似乎我无法像处理单任务和串联任务那样,将request_id作为任务签名的参数传递。
任何帮助都将不胜感激