我有一个DAG,它有三个任务流(licappts、agents、agentpolicy):
为了简单起见,我将这三种不同的流称为。流是独立的,从某种意义上说,仅仅因为agentpolicy失败并不意味着其他两个(licappts和agents)应该受到其他流失败的影响。
但是对于
sourcetype
emr_task_1 tasks(即,licappts_emr_task_1、agents_emr_task_1和agentpolicy_emr_task_1),我一次只能运行其中一个任务。例如,我不能同时运行agents_emr_task_1和agentpolicy_emr_task_1,即使它们是两个不一定关心彼此的独立任务。
如何在气流中实现此功能?现在,我唯一能想到的就是将该任务包装在一个脚本中,该脚本以某种方式锁定了一个全局变量,然后如果该变量被锁定,我将让脚本执行一个线程。休眠(60秒)或其他操作,然后重试。但这看起来很糟糕,我很好奇气流是否提供了解决方案。
如果需要的话,我可以调整DAG的顺序。我想做的一件事是对
dag starts->…->licappts_emr_task_1->代理_emr_task_1->agentpolicy_emr_task_1->DAG完成
< /代码>
但我不认为以这种方式组合流,因为例如,agentpolicy“emr”task“1”必须等待其他两个任务完成才能开始,有时agentpolicy“emr”task“1”在其他两个任务完成之前就准备好了。
所以理想情况下,我希望先启动准备就绪的任何任务,然后阻止其他任务运行它们的sourcetypesourcetype,直到它完成。
更新:
我刚才想到的另一个解决方案是,如果有一种方法可以让我检查另一个任务的状态,我可以为sourcetype创建一个脚本,检查其他两个任务中是否有任何一个有运行状态,如果有运行状态,它将休眠,并定期检查是否没有其他任务正在运行,在这种情况下,它将启动进程。不过,我不太喜欢这种方式,因为我觉得这可能会导致一种比赛状态,在这种情况下,两人都会(同时)读到“没有人在跑步,两人都开始跑步”。
为了简单起见,我将这三种不同的流称为。流是独立的,因为代理策略失败并不意味着其他两个(许可证和代理)应该受到其他流失败的影响。
但是为了原料药_ emr_task_1 tasks(即,licappts_emr_task_1、agents_emr_task_1和agentpolicy_emr_task_1)我一次只能运行其中一个任务。例如,我不能同时运行agents-emr-task-1和agentpolicy-emr-task-1,即使它们是两个彼此不必关心的独立任务。
如何在气流中实现此功能?现在,我唯一能想到的就是将该任务包装在一个脚本中,该脚本以某种方式锁定了一个全局变量,然后如果该变量被锁定,我将让脚本执行一个线程。休眠(60秒)或其他操作,然后重试。但这看起来很糟糕,我很好奇气流是否能解决这个问题。
如果需要的话,我可以调整DAG的顺序。我想做的一件事是对
Dag Starts -> ... -> licappts_emr_task_1 -> agents_emr_task_1 -> agentpolicy_emr_task_1 -> DAG Finished
但我不认为以这种方式组合流,因为例如,agentpolicy“emr”task“1”必须等待其他两个任务完成才能开始,有时agentpolicy“emr”task“1”在其他两个任务完成之前就准备好了。
所以理想情况下我想要什么原料药_ emr_task_1首先启动准备就绪的任务,然后阻止其他任务运行它们的原料药_ emr_task_1任务直到完成。
更新:
我刚才想到的另一个解决方案是,如果有一种方法可以让我检查另一个任务的状态,我可以为它创建一个脚本原料药_ emr_task_1检查是否有其他两个原料药_ emr_task_1任务具有运行状态,如果它们这样做,它将休眠并定期检查其他任务是否正在运行,在这种情况下,它将启动进程。不过,我不太喜欢这种方式,因为我觉得这可能会导致一种比赛状态,两人(同时)都认为没有人在跑,两人都开始跑。