代码之家  ›  专栏  ›  技术社区  ›  Shubham Mishra

执行多个芹菜工人以使用唯一队列

  •  1
  • Shubham Mishra  · 技术社区  · 7 年前

    我有一个用例,在这个用例中,我需要启动一个芹菜工人,以便他们使用唯一的队列,我已经尝试如下实现。

    from celery import Celery
    
    app = Celery(broker='redis://localhost:9555/0')
    
    
    @app.task
    def afunction(arg1=None, arg2=None, arg3=None):
    
        if arg1 == 'awesome_1':
            return "First type of Queue executed"
        if arg2 == "awesome_2":
            return "Second Type of Queue executed"
        if arg3 == "awesome_3":
            return "Third Type of Queue executed"
    
    
    if __name__=='__main__':
        qlist = ["awesome_1", "awesome_2", "awesome_3"]
        arglist = [None, None, None]
        for q in qlist:
            arglist[qlist.index(q)] = q
            argv = [
                'worker',
                '--detach',
                '--queue={0}'.format(q),
                '--concurrency=1',
                '-E',
                '--loglevel=INFO'
            ]
            app.worker_main(argv)
            afunction.apply_async(args=[arglist[0], arglist[1], arglist[2]], queue=q)
    

    此代码在执行时提供以下输出:

    [2018-02-08 11:28:43,479: INFO/MainProcess] Connected to redis://localhost:9555/0
    [2018-02-08 11:28:43,486: INFO/MainProcess] mingle: searching for neighbors
    [2018-02-08 11:28:44,503: INFO/MainProcess] mingle: all alone
    [2018-02-08 11:28:44,527: INFO/MainProcess] celery@SYSTEM ready.
    [2018-02-08 11:28:44,612: INFO/MainProcess] Received task: __main__.afunction[f092f721-6523-4055-98fc-158ac316f4cc]
    [2018-02-08 11:28:44,618: INFO/ForkPoolWorker-1] Task __main__.afunction[f092f721-6523-4055-98fc-158ac316f4cc] succeeded in 0.0010992150055244565s: 'First type of Queue executed'
    

    因此,我可以看到worker在 for 循环,但它就停在那里,不再继续for循环。

    我认为之所以会发生这种情况,是因为worker不是独立运行的,也不是作为脚本的子进程运行的,因为我可以看到python在上运行相同脚本的进程数量是原来的1倍多 ps aux 作为 --concurrency 正在设置。任何关于出错原因或如何使工作队列在之后分离运行的指针 return 那个 对于 循环继续迭代。

    1 回复  |  直到 7 年前
        1
  •  0
  •   Shubham Mishra    7 年前

    虽然我不确定代码对基础设施的影响,但我还是尝试了以下解决方法,并取得了预期的结果。如果有人能评论是否有更好的方法来解决这个问题,那就太好了,但目前我正在使用这个解决方案。

    from celery import Celery
    import os
    import time
    
    
    app = Celery('app', broker='redis://localhost:9555/0')
    
    @app.task
    def afunction(arg1=None, arg2=None, arg3=None):
    
        if arg1 == 'awesome_1':
            return "First type of Queue executed"
        if arg2 == "awesome_2":
            return "Second Type of Queue executed"
        if arg3 == "awesome_3":
            return "Third Type of Queue executed"
    
    qlist = ["awesome_1", "awesome_2", "awesome_3"]
    arglist = [None, None, None]
    for q in qlist:
        os.system('nohup celery worker -A app.celery -Q {0} --loglevel=INFO --concurrency=1 &'.format(q))
        os.system('echo \'\\n\'')
    
    time.sleep(5)
    for q in qlist:
        arglist = [None, None, None]
        arglist[qlist.index(q)] = q
        afunction.apply_async(args=[arglist[0], arglist[1], arglist[2]], queue=q)
    

    这创造了一个nohup。输出具有以下输出的文件:

    [2018-02-08 17:15:53,269: INFO/MainProcess] Connected to redis://localhost:9555/0
    [2018-02-08 17:15:53,272: INFO/MainProcess] Connected to redis://localhost:9555/0
    [2018-02-08 17:15:53,274: INFO/MainProcess] Connected to redis://localhost:9555/0
    [2018-02-08 17:15:53,277: INFO/MainProcess] mingle: searching for neighbors
    [2018-02-08 17:15:53,280: INFO/MainProcess] mingle: searching for neighbors
    [2018-02-08 17:15:53,280: INFO/MainProcess] mingle: searching for neighbors
    [2018-02-08 17:15:54,293: INFO/MainProcess] mingle: all alone
    [2018-02-08 17:15:54,295: INFO/MainProcess] mingle: all alone
    [2018-02-08 17:15:54,296: INFO/MainProcess] mingle: all alone
    [2018-02-08 17:15:54,304: INFO/MainProcess] celery@SYSTEM ready.
    [2018-02-08 17:15:54,304: INFO/MainProcess] celery@SYSTEM ready.
    [2018-02-08 17:15:54,306: INFO/MainProcess] celery@SYSTEM ready.
    [2018-02-08 17:15:57,975: INFO/MainProcess] Received task: app.afunction[e825444d-e123-4f55-9365-f36f95d62734]
    [2018-02-08 17:15:57,976: INFO/ForkPoolWorker-1] Task app.afunction[e825444d-e123-4f55-9365-f36f95d62734] succeeded in 0.0003634110325947404s: 'First type of Queue executed'
    [2018-02-08 17:15:57,976: INFO/MainProcess] Received task: app.afunction[80816d50-5680-4373-8b5e-dac2ae2a3ff9]
    [2018-02-08 17:15:57,977: INFO/MainProcess] Received task: app.afunction[0e88c758-3010-4d37-bda2-6a9a9a02bedf]
    [2018-02-08 17:15:57,977: INFO/ForkPoolWorker-1] Task app.afunction[80816d50-5680-4373-8b5e-dac2ae2a3ff9] succeeded in 0.0003187900292687118s: 'Second Type of Queue executed'
    [2018-02-08 17:15:57,978: INFO/ForkPoolWorker-1] Task app.afunction[0e88c758-3010-4d37-bda2-6a9a9a02bedf] succeeded in 0.00042019598186016083s: 'Third type of queue executed'