代码之家  ›  专栏  ›  技术社区  ›  Volatil3

气流:如何确保DAG每5分钟运行一次?

  •  1
  • Volatil3  · 技术社区  · 6 年前

    我在探索 Apache Airflow . 我使用的方法是在MySQL中插入一条记录。

    我已经安排好了 DAG 每5分钟运行一次,但似乎没有发生,因为MYSQL timestamp告诉我们MYSQL任务在5分钟内执行了很多次。

    enter image description here

    正如您所看到的,它正在几分钟内插入记录。下面是我的代码:

    import datetime as dt
    
    from airflow import DAG
    from airflow.hooks.mysql_hook import MySqlHook
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    
    def fetch_data_mysql():
        mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
        sql = 'SELECT * from random_table'
        sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')"
        print('INSERT MYSQL RESULT')
        # results = mysql_hook.get_records(sql)
        # results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',))
        mysql_hook.run(sql, autocommit=True)
    
    def print_world():
        print('world')
        return 'WORLD IN SEPTEMBER'
    
    
    default_args = {
        'owner': 'me',
        'start_date': dt.datetime(2018, 9, 11),
        'retries': 1,
        'retry_delay': dt.timedelta(minutes=2),
    }
    
    with DAG('airflow_tutorial_v01',
             default_args=default_args,
             schedule_interval='0/5 * * * *',
             ) as dag:
        print_hello = BashOperator(task_id='print_hello',
                                   bash_command='echo "hello"')
        sleep = BashOperator(task_id='sleep',
                             bash_command='sleep 5')
        print_world = PythonOperator(task_id='print_world',
                                     python_callable=print_world)
        mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)
    
    print_hello >> sleep >> print_world >> mysql_task
    

    我正在使用 v1.10.0 .

    给出了日志的链接here:- https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0

    2 回复  |  直到 6 年前
        1
  •  4
  •   kaxil    6 年前

    你这家伙在回填。如果你检查日志,它的执行日期是 2018-09-20 00:15:00+00:00 , 2018-09-20 00:20:00+00:00 , 2018-09-20 00:25:00+00:00 ,等等。

    default_args :

    'catchup_by_default': False

    你的 默认参数

    default_args = {
        'owner': 'me',
        'start_date': dt.datetime(2018, 9, 11),
        'retries': 1,
        'retry_delay': dt.timedelta(minutes=2),
        'catchup_by_default': False,
    }
    
        2
  •  0
  •   chris.mclennon    6 年前

    尝试更改cron计划 0/5 * * * * */5 * * * * . 后者是每五分钟一次,而前者似乎是一个非标准cron语法 crontab.guru