代码之家  ›  专栏  ›  技术社区  ›  Kyle Bridenstine

气流-任务之间的锁定,以便一次只运行一个并行任务?

  •  2
  • Kyle Bridenstine  · 技术社区  · 6 年前

    我有一个DAG,它有三个任务流(licappts、agents、agentpolicy):

    为了简单起见,我将这三种不同的流称为。流是独立的,从某种意义上说,仅仅因为agentpolicy失败并不意味着其他两个(licappts和agents)应该受到其他流失败的影响。

    但是对于 sourcetype emr_task_1 tasks(即,licappts_emr_task_1、agents_emr_task_1和agentpolicy_emr_task_1),我一次只能运行其中一个任务。例如,我不能同时运行agents_emr_task_1和agentpolicy_emr_task_1,即使它们是两个不一定关心彼此的独立任务。

    如何在气流中实现此功能?现在,我唯一能想到的就是将该任务包装在一个脚本中,该脚本以某种方式锁定了一个全局变量,然后如果该变量被锁定,我将让脚本执行一个线程。休眠(60秒)或其他操作,然后重试。但这看起来很糟糕,我很好奇气流是否提供了解决方案。

    如果需要的话,我可以调整DAG的顺序。我想做的一件事是对

    dag starts->…->licappts_emr_task_1->代理_emr_task_1->agentpolicy_emr_task_1->DAG完成
    < /代码> 
    
    

    但我不认为以这种方式组合流,因为例如,agentpolicy“emr”task“1”必须等待其他两个任务完成才能开始,有时agentpolicy“emr”task“1”在其他两个任务完成之前就准备好了。

    所以理想情况下,我希望先启动准备就绪的任何任务,然后阻止其他任务运行它们的sourcetypesourcetype,直到它完成。

    更新:

    我刚才想到的另一个解决方案是,如果有一种方法可以让我检查另一个任务的状态,我可以为sourcetype创建一个脚本,检查其他两个任务中是否有任何一个有运行状态,如果有运行状态,它将休眠,并定期检查是否没有其他任务正在运行,在这种情况下,它将启动进程。不过,我不太喜欢这种方式,因为我觉得这可能会导致一种比赛状态,在这种情况下,两人都会(同时)读到“没有人在跑步,两人都开始跑步”。

    为了简单起见,我将这三种不同的流称为。流是独立的,因为代理策略失败并不意味着其他两个(许可证和代理)应该受到其他流失败的影响。

    但是为了原料药_ emr_task_1 tasks(即,licappts_emr_task_1、agents_emr_task_1和agentpolicy_emr_task_1)我一次只能运行其中一个任务。例如,我不能同时运行agents-emr-task-1和agentpolicy-emr-task-1,即使它们是两个彼此不必关心的独立任务。

    如何在气流中实现此功能?现在,我唯一能想到的就是将该任务包装在一个脚本中,该脚本以某种方式锁定了一个全局变量,然后如果该变量被锁定,我将让脚本执行一个线程。休眠(60秒)或其他操作,然后重试。但这看起来很糟糕,我很好奇气流是否能解决这个问题。

    如果需要的话,我可以调整DAG的顺序。我想做的一件事是对

    Dag Starts -> ... -> licappts_emr_task_1 -> agents_emr_task_1 -> agentpolicy_emr_task_1 -> DAG Finished
    

    但我不认为以这种方式组合流,因为例如,agentpolicy“emr”task“1”必须等待其他两个任务完成才能开始,有时agentpolicy“emr”task“1”在其他两个任务完成之前就准备好了。

    所以理想情况下我想要什么原料药_ emr_task_1首先启动准备就绪的任务,然后阻止其他任务运行它们的原料药_ emr_task_1任务直到完成。

    更新:

    我刚才想到的另一个解决方案是,如果有一种方法可以让我检查另一个任务的状态,我可以为它创建一个脚本原料药_ emr_task_1检查是否有其他两个原料药_ emr_task_1任务具有运行状态,如果它们这样做,它将休眠并定期检查其他任务是否正在运行,在这种情况下,它将启动进程。不过,我不太喜欢这种方式,因为我觉得这可能会导致一种比赛状态,两人(同时)都认为没有人在跑,两人都开始跑。

    1 回复  |  直到 6 年前
        1
  •  5
  •   cwurtz    6 年前

    你可以使用 pool 以确保这些任务的并行性为1。

    对于每一个 *_emr_task_1 任务,设置 pool Kwarg就是这样的人 pool=emr_task .

    然后进入Web服务器->管理->池->创建: 设置名称 Pool 以匹配操作员使用的池,以及 Slots 1岁。

    这将确保调度程序将只允许在配置的插槽数之内为该池排队的任务,而不管其余气流的并行性如何。