代码之家  ›  专栏  ›  技术社区  ›  Darshan Mehta

气流:外部任务传感器不触发任务

  •  2
  • Darshan Mehta  · 技术社区  · 6 年前

    this this

    from airflow import DAG
    from airflow.operators.jdbc_operator import JdbcOperator
    from datetime import datetime
    from airflow.operators.bash_operator import BashOperator
    
    today = datetime.today()
    
    default_args = {
        'depends_on_past': False,
        'retries': 0,
        'start_date': datetime(today.year, today.month, today.day),
        'schedule_interval': '@once'
    }
    
    dag = DAG('call-procedure-and-bash', default_args=default_args)
    
    call_procedure = JdbcOperator(
        task_id='call_procedure',
        jdbc_conn_id='airflow_db2',
        sql='CALL AIRFLOW.TEST_INSERT (20)',
        dag=dag
    )
    
    call_procedure
    

    以下是我的从属DAG:

    from airflow import DAG
    from airflow.operators.jdbc_operator import JdbcOperator
    from datetime import datetime, timedelta
    from airflow.sensors.external_task_sensor import ExternalTaskSensor
    
    today = datetime.today()
    
    default_args = {
        'depends_on_past': False,
        'retries': 0,
        'start_date': datetime(today.year, today.month, today.day),
        'schedule_interval': '@once'
    }
    
    dag = DAG('external-dag-upstream', default_args=default_args)
    
    task_sensor = ExternalTaskSensor(
        task_id='link_upstream',
        external_dag_id='call-procedure-and-bash',
        external_task_id='call_procedure',
        execution_delta=timedelta(minutes=-2),
        dag=dag
    )
    
    count_rows = JdbcOperator(
        task_id='count_rows',
        jdbc_conn_id='airflow_db2',
        sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
        dag=dag
    )
    
    count_rows.set_upstream(task_sensor)
    

    以下是主DAG执行后从属DAG的日志:

    [2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    [2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    [2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    [2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    [2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    [2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    [2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
    

    [2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
    [2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
    [2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0
    

    我的假设是,如果主机运行正常,气流应该触发从属DAG?我试着玩 execution_delta 但这似乎行不通。

    也, schedule_interval start_date 对两个达格来说都是一样的,所以不要认为这会带来任何麻烦。

    我有什么遗漏吗?

    1 回复  |  直到 6 年前
        1
  •  1
  •   psius1    5 年前

    确保两个DAG同时启动,而不是手动启动任何一个DAG。

        2
  •  0
  •   dlamblin    6 年前

    可能您应该使用正时间增量: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html 因为当减去执行增量时,它将最终寻找一个在自身运行2分钟后运行的任务。

    但是delta实际上不是一个范围,TI必须有一个匹配的Dag ID、Task ID、successful result以及datetimes列表中的执行日期。当你付出的时候 execution_delta 作为delta,是一个日期时间的列表,取当前执行日期并减去timedelta。

    这可能取决于您删除timedelta以便两个执行日期匹配,传感器将等待另一个任务成功,或者您的开始日期和计划间隔基本上设置为今天,并且 @once 得到的执行日期彼此不在可预测的锁定步骤中。你可以试着说 datetime(2019,1,10) 0 1 * * * 让他们每天凌晨1点跑步 ).