代码之家  ›  专栏  ›  技术社区  ›  andilabs

@task\u postrun。连接芹菜中的信号并执行另一个任务会导致一些执行的内场循环

  •  0
  • andilabs  · 技术社区  · 6 年前

    我需要以下工作流程为我的芹菜任务。

    当taskA成功完成时,我想执行taskB。

    我知道有信号 @task_success 但这只返回任务的结果,我需要访问前一个任务的参数。所以我决定编写如下代码:

    @app.task
    def taskA(arg):
        # not cool, but... https://github.com/celery/celery/issues/3797
        from shopify.tasks import taskA
        taskA(arg)
    
    
    @task_postrun.connect
    def fetch_taskA_success_handler(sender=None, **kwargs):
        from gcp.tasks import taskB
        if kwargs.get('state') == 'SUCCESS':
            taskB.apply_async((kwargs.get('args')[0], ))
    

    问题是 taskB 似乎在一些无休止的循环中执行了很多次,而不是只有一次。

    1 回复  |  直到 6 年前
        1
  •  1
  •   andilabs    6 年前

    @app.task
    def taskA(arg):
        # not cool, but... https://github.com/celery/celery/issues/3797
        # otherwise it won't added in periodic tasks
        from shopify.tasks import taskA
        return taskA(arg)
    
    
    @task_postrun.connect
    def taskA_success_handler(sender=None, state=None, **kwargs):
    
        resource_name = kwargs.get('kwargs', {}).get('resource_name')
    
        if resource_name and state == 'SUCCESS':
    
            if sender.name == 'shopify.tasks.taskA':
                from gcp.tasks import taskB
                taskB.apply_async(kwargs={
                    'resource_name': resource_name
                })
    

    仅供参考:

    celery==4.1.0
    Django==2.0
    django-celery-beat==1.1.0
    django-celery-results==1.0.1
    flower==0.9.2
    amqp==2.2.2
    
    Python 3.6