代码之家  ›  专栏  ›  技术社区  ›  Georgi Raychev

分支任务问题

  •  1
  • Georgi Raychev  · 技术社区  · 6 年前

    从视觉上看,我的DAG是这样的:

    DAG

    代码本身是这样的:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import BranchPythonOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2018, 10, 9)
    }
    
    now = datetime.now()
    minute_check = now.minute % 5
    
    dag = DAG(
        dag_id='test3',
        default_args=default_args,
        schedule_interval='* * * * *',
        dagrun_timeout=timedelta(minutes=5),
        catchup=False,
        max_active_runs=99
            )
    
    def check_minute():
        if minute_check == 0:
            return "branch_fiveminute"
        else:
            return "branch_minute"
    
    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=check_minute,
        trigger_rule='all_done',
        dag=dag)
    
    branch_minute = BashOperator(
        task_id='branch_minute',
        bash_command='test1min.sh ',
        trigger_rule='all_done',
        dag=dag)
    
    branch_fiveminute = BashOperator(
        task_id='branch_fiveminute',
        bash_command='test5min.sh ',
        trigger_rule='all_done',
        dag=dag)
    
    branch_task.set_downstream(branch_minute)
    branch_task.set_downstream(branch_fiveminute)
    branch_fiveminute.set_downstream(branch_minute)
    

    enter image description here

    我尝试过使用触发器规则设置,但没有成功。

    有什么问题吗?如果有关系的话,我用的是气流1.10。

    1 回复  |  直到 5 年前
        1
  •  1
  •   Georgi Raychev    6 年前

    由于5分钟任务的执行路径不同,因此会跳过1分钟任务。从图中可以看出这有点违反直觉,但是只有一条带有execute的路径。

    这样,虚拟任务将被跳过,但执行流将在1分钟任务中结束,而不管选择了哪个执行路径。

    from airflow import DAG
    from airflow.operators.python_operator import BranchPythonOperator
    from airflow.operators.dummy_operator  import DummyOperator
    from airflow.operators.bash_operator   import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2018, 10, 9)
    }
    
    now = datetime.now()
    minute_check = now.minute % 5
    
    dag = DAG(
        dag_id='test3',
        default_args=default_args,
        schedule_interval='* * * * *',
        dagrun_timeout=timedelta(minutes=5),
        catchup=False,
        max_active_runs=99
            )
    
    def check_minute():
        if minute_check == 0:
            return "branch_fiveminute"
        else:
            return "branch_false_1"
    
    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=check_minute,
        trigger_rule='all_done',
        dag=dag)
    
    branch_minute = BashOperator(
        task_id='branch_minute',
        bash_command='test1min.sh ',
        trigger_rule='all_done',
        dag=dag)
    
    branch_fiveminute = BashOperator(
        task_id='branch_fiveminute',
        bash_command='test5min.sh ',
        trigger_rule='all_done',
        dag=dag)
    
    branch_false_1 = DummyOperator( task_id= "branch_false_1", dag=dag )
    
    branch_task.set_downstream(branch_false_1)
    branch_task.set_downstream(branch_fiveminute)
    branch_fiveminute.set_downstream(branch_minute)
    branch_false_1.set_downstream(branch_minute)