代码之家  ›  专栏  ›  技术社区  ›  Richiedlon

芹菜任务队列

  •  1
  • Richiedlon  · 技术社区  · 7 年前

    我创建了一个烧瓶应用程序,它由两个芹菜任务组成。

    任务1:通过流程生成文件

    任务2:通过电子邮件发送生成的文件

    通常情况下,任务1比任务2需要更多的时间。我想先执行任务1,然后执行任务2。但问题是两者在芹菜内同时开始执行。

    如何解决此问题。

    @celery.task(name='celery_example.process')
    def process(a,b,c,d,e,f):
        command='rnx2rtkp -p '+a+' -f '+b+' -m '+c+' -n -o oout.pos '+d+' '+e+' '+f
        os.system(command)
        return 'Successfully created POS file'
    
    @celery.task(name='celery_example.emailfile')
    def emailfile(recipientemail):  
        email_user = ''
        email_password = ''
        subject = 'subject'
        msg = MIMEMultipart()
        msg['From'] = email_user
        msg['To'] = recipientemail
        msg['Subject'] = subject
        body = 'This is your Post-Processed position file'
        msg.attach(MIMEText(body,'plain'))
        filename='oout.pos'
        attachment  =open(filename,'rb')
        part = MIMEBase('application','octet-stream')
        part.set_payload((attachment).read())
        encoders.encode_base64(part)
        part.add_header('Content-Disposition',"attachment; filename= "+filename)
        msg.attach(part)
        text = msg.as_string()
        server = smtplib.SMTP('smtp.gmail.com',587)
        server.starttls()
        server.login(email_user,email_password)
        server.sendmail(email_user,recipientemail,text)
        server.quit()
        return 'Email has been successfully sent'   
    

    这是应用程序。路线

    @app.route('/pp.php', methods=['GET', 'POST'])
    def pp():
        pp = My1Form()
        target = os.path.join(APP_ROOT)
        print(target)
    
        for fileBase in request.files.getlist("fileBase"):
            print(fileBase)
            filename = fileBase.filename
            destination = "/".join([target, filename])
            print(destination)
            fileBase.save(destination)
    
        for fileObsRover in request.files.getlist("fileObsRover"):
            print(fileObsRover)
            filename = fileObsRover.filename
            destination = "/".join([target, filename])
            print(destination)
            fileObsRover.save(destination)
    
        for fileNavRover in request.files.getlist("fileNavRover"):
            print(fileNavRover)
            filename = fileNavRover.filename
            destination = "/".join([target, filename])
            print(destination)
            fileNavRover.save(destination)
    
            a=fileObsRover.filename
            b=fileBase.filename
            c=fileNavRover.filename
            elevation=pp.ema.data
            Freq=pp.frq.data
            posMode=pp.pmode.data
            emailAdd=pp.email.data
    
            process.delay(posMode,Freq,elevation,a,b,c)
            emailfile.delay(emailAdd)
    
            return render_template('results.html', email=pp.email.data, Name=pp.Name.data, ema=elevation, frq=Freq, pmode=posMode, fileBase=a)
        return render_template('pp.php', pp=pp)
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   donkopotamus    7 年前

    目前,您的代码执行以下操作:

    # schedule process to run asynchronously
    process.delay(posMode,Freq,elevation,a,b,c)
    
    # schedule emailfile to run asynchronously
    emailfile.delay(emailAdd)
    

    这两项工作将立即由工人负责并执行。您已提供 无需通知 celery 那个 emailfile 应该等到 processfile 已完成

    相反,您应该:

    • 更改签名 电子邮件文件 包含另一个参数,该参数将作为成功 进程文件 呼叫然后
    • 呼叫 进程文件 使用 link

    例如:

    deferred = processfile.apply_async(
        (posMode,Freq,elevation,a,b,c), 
        link=emailfile.s()) 
    deferred.get()
    

    使用的替代方案 链接 ,但在本例中语义相同,将使用 chain